diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 60dfee0e9209..dda272f45621 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -236,7 +236,8 @@ void Init( yqSharedResources->UserSpaceYdbDriver, appData->Mon, appData->Counters, - MakeNodesManagerId()); + MakeNodesManagerId(), + true); actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release()); } @@ -259,7 +260,7 @@ void Init( ); auto pqGateway = pqGatewayFactory ? pqGatewayFactory->CreatePqGateway() : NYql::CreatePqNativeGateway(std::move(pqServices)); RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway, - yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), commonConfig.GetPqReconnectPeriod()); + yqCounters->GetSubgroup("subsystem", "DqSourceTracker"), commonConfig.GetPqReconnectPeriod(), true); s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg, yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles()); @@ -267,7 +268,7 @@ void Init( httpGateway, s3HttpRetryPolicy); RegisterGenericProviderFactories(*asyncIoFactory, credentialsFactory, connectorClient); - RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway, yqCounters->GetSubgroup("subsystem", "DqSinkTracker")); + RegisterDqPqWriteActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, pqGateway, yqCounters->GetSubgroup("subsystem", "DqSinkTracker"), true); RegisterDQSolomonWriteActorFactory(*asyncIoFactory, credentialsFactory); RegisterDQSolomonReadActorFactory(*asyncIoFactory, credentialsFactory); } diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp index ca69d6bbaef3..e7cb52cb5607 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.cpp @@ -23,7 +23,8 @@ struct TActorFactory : public IActorFactory { const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize) const override { + ui64 maxBufferSize, + bool enableStreamingQueriesCounters) const override { auto actorPtr = NFq::NewTopicSession( readGroup, @@ -40,7 +41,8 @@ struct TActorFactory : public IActorFactory { counters, countersRoot, pqGateway, - maxBufferSize + maxBufferSize, + enableStreamingQueriesCounters ); return NActors::TActivationContext::Register(actorPtr.release(), {}, NActors::TMailboxType::HTSwap, Max()); } diff --git a/ydb/core/fq/libs/row_dispatcher/actors_factory.h b/ydb/core/fq/libs/row_dispatcher/actors_factory.h index 1d1814953baa..08c12ca0b28d 100644 --- a/ydb/core/fq/libs/row_dispatcher/actors_factory.h +++ b/ydb/core/fq/libs/row_dispatcher/actors_factory.h @@ -27,7 +27,8 @@ struct IActorFactory : public TThrRefBase { const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize) const = 0; + ui64 maxBufferSize, + bool enableStreamingQueriesCounters) const = 0; }; IActorFactory::TPtr CreateActorFactory(); diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp index a7aea662d7c1..d742e509514e 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp @@ -5,6 +5,9 @@ #include "leader_election.h" #include "probes.h" +#include +#include + #include #include #include @@ -117,14 +120,17 @@ struct TQueryStatKeyHash { struct TAggQueryStat { TAggQueryStat() = default; - TAggQueryStat(const TString& queryId, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::NPq::NProto::TDqPqTopicSource& sourceParams) + TAggQueryStat(const TString& queryId, const ::NMonitoring::TDynamicCounterPtr& counters, const NYql::NPq::NProto::TDqPqTopicSource& sourceParams, bool enableStreamingQueriesCounters) : QueryId(queryId) , SubGroup(counters) { - for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { - SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + auto topicGroup = SubGroup; + if (enableStreamingQueriesCounters) { + for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { + SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + } + SubGroup = SubGroup->GetSubgroup("query_id", queryId); + topicGroup = SubGroup->GetSubgroup("read_group", SanitizeLabel(sourceParams.GetReadGroup())); } - auto queryGroup = SubGroup->GetSubgroup("query_id", queryId); - auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(sourceParams.GetReadGroup())); MaxQueuedBytesCounter = topicGroup->GetCounter("MaxQueuedBytes"); AvgQueuedBytesCounter = topicGroup->GetCounter("AvgQueuedBytes"); MaxReadLagCounter = topicGroup->GetCounter("MaxReadLag"); @@ -411,6 +417,7 @@ class TRowDispatcher : public TActorBootstrapped { TMap> ConsumersByEventQueueId; THashMap TopicSessions; TMap ReadActorsInternalState; + bool EnableStreamingQueriesCounters = false; public: explicit TRowDispatcher( @@ -425,7 +432,8 @@ class TRowDispatcher : public TActorBootstrapped { const NYql::IPqGateway::TPtr& pqGateway, NYdb::TDriver driver, NActors::TMon* monitoring = nullptr, - NActors::TActorId nodesManagerId = {} + NActors::TActorId nodesManagerId = {}, + bool enableStreamingQueriesCounters = false ); void Bootstrap(); @@ -509,7 +517,8 @@ TRowDispatcher::TRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway, NYdb::TDriver driver, NActors::TMon* monitoring, - NActors::TActorId nodesManagerId) + NActors::TActorId nodesManagerId, + bool enableStreamingQueriesCounters) : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) , CredentialsFactory(credentialsFactory) @@ -525,6 +534,7 @@ TRowDispatcher::TRowDispatcher( , Driver(driver) , Monitoring(monitoring) , NodesManagerId(nodesManagerId) + , EnableStreamingQueriesCounters(enableStreamingQueriesCounters) { Y_ENSURE(!Tenant.empty()); } @@ -665,7 +675,7 @@ void TRowDispatcher::UpdateMetrics() { TQueryStatKey statKey{consumer->QueryId, key.ReadGroup}; auto& stats = AggrStats.LastQueryStats.emplace( statKey, - TAggQueryStat(consumer->QueryId, Metrics.Counters, consumer->SourceParams)).first->second; + TAggQueryStat(consumer->QueryId, Metrics.Counters, consumer->SourceParams, EnableStreamingQueriesCounters)).first->second; stats.Add(partition.Stat, partition.FilteredBytes); partition.FilteredBytes = 0; } @@ -841,9 +851,11 @@ void TRowDispatcher::UpdateReadActorsInternalState() { void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { LOG_ROW_DISPATCHER_DEBUG("Received TEvStartSession from " << ev->Sender << ", read group " << ev->Get()->Record.GetSource().GetReadGroup() << ", topicPath " << ev->Get()->Record.GetSource().GetTopicPath() << " part id " << JoinSeq(',', ev->Get()->Record.GetPartitionIds()) << " query id " << ev->Get()->Record.GetQueryId() << " cookie " << ev->Cookie); - auto queryGroup = Metrics.Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); - auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(ev->Get()->Record.GetSource().GetReadGroup())); - topicGroup->GetCounter("StartSession", true)->Inc(); + if (EnableStreamingQueriesCounters) { + auto queryGroup = Metrics.Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId()); + auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(ev->Get()->Record.GetSource().GetReadGroup())); + topicGroup->GetCounter("StartSession", true)->Inc(); + } LWPROBE(StartSession, ev->Sender.ToString(), ev->Get()->Record.GetQueryId(), ev->Get()->Record.ByteSizeLong()); @@ -896,7 +908,8 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { Counters, CountersRoot, PqGateway, - MaxSessionBufferSizeBytes + MaxSessionBufferSizeBytes, + EnableStreamingQueriesCounters ); TSessionInfo& sessionInfo = topicSessionInfo.Sessions[sessionActorId]; sessionInfo.Consumers[ev->Sender] = consumerInfo; @@ -1303,7 +1316,8 @@ std::unique_ptr NewRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway, NYdb::TDriver driver, NActors::TMon* monitoring, - NActors::TActorId nodesManagerId) + NActors::TActorId nodesManagerId, + bool enableStreamingQueriesCounters) { return std::unique_ptr(new TRowDispatcher( config, @@ -1317,7 +1331,8 @@ std::unique_ptr NewRowDispatcher( pqGateway, driver, monitoring, - nodesManagerId)); + nodesManagerId, + enableStreamingQueriesCounters)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h index 66f021c65760..88b942f3b684 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher.h @@ -31,6 +31,7 @@ std::unique_ptr NewRowDispatcher( const NYql::IPqGateway::TPtr& pqGateway, NYdb::TDriver driver, NActors::TMon* monitoring = nullptr, - NActors::TActorId nodesManagerId = {}); + NActors::TActorId nodesManagerId = {}, + bool enableStreamingQueriesCounters = false); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp index f4447ed681e5..0c99fa3ffece 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.cpp @@ -20,7 +20,8 @@ std::unique_ptr NewRowDispatcherService( NYdb::TDriver driver, NActors::TMon* monitoring, ::NMonitoring::TDynamicCounterPtr countersRoot, - NActors::TActorId nodesManagerId) + NActors::TActorId nodesManagerId, + bool enableStreamingQueriesCounters) { return NewRowDispatcher( config, @@ -34,7 +35,8 @@ std::unique_ptr NewRowDispatcherService( pqGateway, driver, monitoring, - nodesManagerId); + nodesManagerId, + enableStreamingQueriesCounters); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h index da5b19f96d8c..0ebb57c3b0b7 100644 --- a/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h +++ b/ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h @@ -28,6 +28,7 @@ std::unique_ptr NewRowDispatcherService( NYdb::TDriver driver, NActors::TMon* monitoring = nullptr, ::NMonitoring::TDynamicCounterPtr countersRoot = MakeIntrusive<::NMonitoring::TDynamicCounters>(), - NActors::TActorId nodesManagerId = {}); + NActors::TActorId nodesManagerId = {}, + bool enableStreamingQueriesCounters = false); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index 90aeaa53e2e2..3a9d3288f79f 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -1,5 +1,8 @@ #include "topic_session.h" +#include +#include + #include #include #include @@ -23,11 +26,14 @@ namespace { //////////////////////////////////////////////////////////////////////////////// struct TTopicSessionMetrics { - void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroup, ui32 partitionId) { - TopicGroup = counters->GetSubgroup("topic", SanitizeLabel(topicPath)); - ReadGroup = TopicGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); - PartitionGroup = ReadGroup->GetSubgroup("partition", ToString(partitionId)); - + void Init(const ::NMonitoring::TDynamicCounterPtr& counters, const TString& topicPath, const TString& readGroupName, ui32 partitionId, bool enableStreamingQueriesCounters) { + ReadGroup = counters; + PartitionGroup = counters; + if (enableStreamingQueriesCounters) { + const auto topicGroup = counters->GetSubgroup("topic", SanitizeLabel(topicPath)); + ReadGroup = topicGroup->GetSubgroup("read_group", SanitizeLabel(readGroupName)); + PartitionGroup = ReadGroup->GetSubgroup("partition", ToString(partitionId)); + } AllSessionsDataRate = ReadGroup->GetCounter("AllSessionsDataRate", true); InFlyAsyncInputData = PartitionGroup->GetCounter("InFlyAsyncInputData"); InFlySubscribe = PartitionGroup->GetCounter("InFlySubscribe"); @@ -37,10 +43,8 @@ struct TTopicSessionMetrics { WaitEventTimeMs = PartitionGroup->GetHistogram("WaitEventTimeMs", NMonitoring::ExplicitHistogram({5, 20, 100, 500, 2000})); QueuedBytes = PartitionGroup->GetCounter("QueuedBytes"); } - - ::NMonitoring::TDynamicCounterPtr TopicGroup; - ::NMonitoring::TDynamicCounterPtr ReadGroup; ::NMonitoring::TDynamicCounterPtr PartitionGroup; + ::NMonitoring::TDynamicCounterPtr ReadGroup; ::NMonitoring::TDynamicCounters::TCounterPtr InFlyAsyncInputData; ::NMonitoring::TDynamicCounters::TCounterPtr InFlySubscribe; ::NMonitoring::TDynamicCounters::TCounterPtr ReconnectRate; @@ -101,7 +105,7 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven struct TClientsInfo : public IClientDataConsumer { using TPtr = TIntrusivePtr; - TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup, TMaybe offset) + TClientsInfo(TTopicSession& self, const TString& logPrefix, const ITopicFormatHandler::TSettings& handlerSettings, const NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev, const NMonitoring::TDynamicCounterPtr& counters, const TString& readGroup, TMaybe offset, bool enableStreamingQueriesCounters) : Self(self) , LogPrefix(logPrefix) , HandlerSettings(handlerSettings) @@ -122,11 +126,15 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven InitialOffset = *offset; } Y_UNUSED(TDuration::TryParse(ev->Get()->Record.GetSource().GetReconnectPeriod(), ReconnectPeriod)); - for (const auto& sensor : ev->Get()->Record.GetSource().GetTaskSensorLabel()) { - Counters = Counters->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + + auto readSubGroup = Counters; + if (enableStreamingQueriesCounters) { + for (const auto& sensor : ev->Get()->Record.GetSource().GetTaskSensorLabel()) { + Counters = Counters->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + } + readSubGroup = readSubGroup->GetSubgroup("query_id", QueryId); + readSubGroup = readSubGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); } - auto queryGroup = Counters->GetSubgroup("query_id", QueryId); - auto readSubGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup)); FilteredDataRate = readSubGroup->GetCounter("FilteredDataRate", true); RestartSessionByOffsetsByQuery = readSubGroup->GetCounter("RestartSessionByOffsetsByQuery", true); @@ -298,6 +306,7 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven TTopicSessionMetrics Metrics; const ::NMonitoring::TDynamicCounterPtr Counters; const ::NMonitoring::TDynamicCounterPtr CountersRoot; + bool EnableStreamingQueriesCounters = false; public: TTopicSession( @@ -315,7 +324,8 @@ class TTopicSession : public TActorBootstrapped, NYql::TTopicEven const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize); + ui64 maxBufferSize, + bool enableStreamingQueriesCounters); void Bootstrap(); void PassAway() override; @@ -403,7 +413,8 @@ TTopicSession::TTopicSession( const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize) + ui64 maxBufferSize, + bool enableStreamingQueriesCounters) : ReadGroup(readGroup) , TopicPath(topicPath) , TopicPathPartition(TStringBuilder() << topicPath << "/" << partitionId) @@ -421,11 +432,12 @@ TTopicSession::TTopicSession( , LogPrefix("TopicSession") , Counters(counters) , CountersRoot(countersRoot) + , EnableStreamingQueriesCounters(enableStreamingQueriesCounters) {} void TTopicSession::Bootstrap() { Become(&TTopicSession::StateFunc); - Metrics.Init(Counters, TopicPath, ReadGroup, PartitionId); + Metrics.Init(Counters, TopicPath, ReadGroup, PartitionId, EnableStreamingQueriesCounters); LogPrefix = LogPrefix + " " + SelfId().ToString() + " "; LOG_ROW_DISPATCHER_DEBUG("Bootstrap " << TopicPathPartition << ", Timeout " << Config.GetTimeoutBeforeStartSession() << " sec"); @@ -793,7 +805,7 @@ void TTopicSession::Handle(NFq::TEvRowDispatcher::TEvStartSession::TPtr& ev) { const TString& format = source.GetFormat(); ITopicFormatHandler::TSettings handlerSettings = {.ParsingFormat = format ? format : "raw"}; - auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup, offset)}).first->second; + auto clientInfo = Clients.insert({ev->Sender, MakeIntrusive(*this, LogPrefix, handlerSettings, ev, Counters, ReadGroup, offset, EnableStreamingQueriesCounters)}).first->second; auto formatIt = FormatHandlers.find(handlerSettings); if (formatIt == FormatHandlers.end()) { auto config = CreateFormatHandlerConfig(Config, FunctionRegistry, CompileServiceActorId, source.GetSkipJsonErrors()); @@ -1062,8 +1074,9 @@ std::unique_ptr NewTopicSession( const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize) { - return std::unique_ptr(new TTopicSession(readGroup, topicPath, endpoint, database, config, functionRegistry, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, countersRoot, pqGateway, maxBufferSize)); + ui64 maxBufferSize, + bool enableStreamingQueriesCounters) { + return std::unique_ptr(new TTopicSession(readGroup, topicPath, endpoint, database, config, functionRegistry, rowDispatcherActorId, compileServiceActorId, partitionId, std::move(driver), credentialsProviderFactory, counters, countersRoot, pqGateway, maxBufferSize, enableStreamingQueriesCounters)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.h b/ydb/core/fq/libs/row_dispatcher/topic_session.h index f71d741ad694..de498fff03c6 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.h +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.h @@ -25,6 +25,7 @@ std::unique_ptr NewTopicSession( const ::NMonitoring::TDynamicCounterPtr& counters, const ::NMonitoring::TDynamicCounterPtr& countersRoot, const NYql::IPqGateway::TPtr& pqGateway, - ui64 maxBufferSize); + ui64 maxBufferSize, + bool enableStreamingQueriesCounters); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp index 5d6b1f80bf26..dc069c4d2086 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp @@ -43,7 +43,8 @@ struct TTestActorFactory : public NFq::NRowDispatcher::IActorFactory { const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const ::NMonitoring::TDynamicCounterPtr& /*counters*/, const NYql::IPqGateway::TPtr& /*pqGateway*/, - ui64 /*maxBufferSize*/) const override { + ui64 /*maxBufferSize*/, + bool /*enableStreamingQueriesCounters*/) const override { auto actorId = Runtime.AllocateEdgeActor(); ActorIds.push(actorId); return actorId; diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 11eedb13148a..1879d0218f89 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -90,7 +90,8 @@ class TFixture : public NTests::TBaseFixture { MakeIntrusive(), MakeIntrusive(), !MockTopicSession ? CreatePqNativeGateway(pqServices) : MockPqGateway, - 16000000 + 16000000, + true ).release()); Runtime.EnableScheduleForActor(TopicSession); } diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index ee1fdd881a61..47853ac35f60 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -109,8 +109,9 @@ NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory( NYql::NDq::RegisterDQSolomonReadActorFactory(*factory, federatedQuerySetup->CredentialsFactory); NYql::NDq::RegisterDQSolomonWriteActorFactory(*factory, federatedQuerySetup->CredentialsFactory); - NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr); - NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, nullptr); + bool enableStreamingQueriesCounters = NKikimr::AppData()->FeatureFlags.GetEnableStreamingQueriesCounters(); + NYql::NDq::RegisterDqPqReadActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSourceTracker"), {}, enableStreamingQueriesCounters); + NYql::NDq::RegisterDqPqWriteActorFactory(*factory, *federatedQuerySetup->Driver, federatedQuerySetup->CredentialsFactory, federatedQuerySetup->PqGateway, counters->GetKqpCounters()->GetSubgroup("subsystem", "DqSinkTracker"), enableStreamingQueriesCounters); } return factory; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 6e7c17681ef0..b289d289e3cf 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2814,13 +2814,17 @@ class TKqpDataExecuter : public TKqpExecuterBaseCounters->GetKqpCounters(); + if (AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()) { + counters = counters->GetSubgroup("path", context->StreamingQueryPath); + } const auto& checkpointId = context->CheckpointId; CheckpointCoordinatorId = Register(MakeCheckpointCoordinator( ::NFq::TCoordinatorId(checkpointId, Generation), NYql::NDq::MakeCheckpointStorageID(), SelfId(), {}, - Counters->Counters->GetKqpCounters()->GetSubgroup("path", context->StreamingQueryPath), + counters, graphParams, stateLoadMode, streamingDisposition).Release()); diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp index 14a9b61c38de..152d54f95501 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp @@ -2492,6 +2492,10 @@ void TKqpTasksGraph::BuildReadTasksFromSource(TStageInfo& stageInfo, const TVect FillReadTaskFromSource(task, sourceName, structuredToken, resourceSnapshot, nodeOffset++); + if (GetMeta().UserRequestContext && GetMeta().UserRequestContext->StreamingQueryPath) { + task.Meta.TaskParams.emplace("query_path", GetMeta().UserRequestContext->StreamingQueryPath); + } + tasksIds.push_back(task.Id); } @@ -2826,6 +2830,10 @@ void TKqpTasksGraph::BuildExternalSinks(const NKqpProto::TKqpSink& sink, TKqpTas } } + if (GetMeta().UserRequestContext && GetMeta().UserRequestContext->StreamingQueryPath) { + task.Meta.TaskParams.emplace("query_path", GetMeta().UserRequestContext->StreamingQueryPath); + } + auto& output = task.Outputs[sink.GetOutputIndex()]; output.Type = TTaskOutputType::Sink; output.SinkType = extSink.GetType(); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index e5b8915e412c..17061a113086 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1806,7 +1806,9 @@ class TKqpProxyService : public TActorBootstrapped { FederatedQuerySetup->PqGateway, *FederatedQuerySetup->Driver, AppData()->Mon, - Counters->GetKqpCounters()); + Counters->GetKqpCounters(), + {}, + AppData()->FeatureFlags.GetEnableStreamingQueriesCounters()); RowDispatcherService = TActivationContext::Register(rowDispatcher.release()); TActivationContext::ActorSystem()->RegisterLocalService( @@ -1823,7 +1825,7 @@ class TKqpProxyService : public TActorBootstrapped { "cs", NKikimr::CreateYdbCredentialsProviderFactory, *FederatedQuerySetup->Driver, - Counters->GetKqpCounters()->GetSubgroup("subsystem", "storage_service")); + Counters->GetKqpCounters()->GetSubgroup("subsystem", "checkpoints_storage_service")); CheckpointStorageService = TActivationContext::Register(service.release()); TActivationContext::ActorSystem()->RegisterLocalService( diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto index 2af30de95010..5b331ec1d65a 100644 --- a/ydb/core/protos/feature_flags.proto +++ b/ydb/core/protos/feature_flags.proto @@ -241,4 +241,5 @@ message TFeatureFlags { optional bool DisableMissingDefaultColumnsInBulkUpsert = 215 [default = true]; optional bool EnableColumnTablesBackup = 216 [default = true]; optional bool EnableReplication = 217 [default = true]; + optional bool EnableStreamingQueriesCounters = 218 [default = false]; } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 52462acc3ac3..6d3767213fef 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include #include @@ -77,7 +79,7 @@ LWTRACE_USING(DQ_PQ_PROVIDER); } // namespace struct TRowDispatcherReadActorMetrics { - explicit TRowDispatcherReadActorMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, const NPq::NProto::TDqPqTopicSource& sourceParams) + explicit TRowDispatcherReadActorMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, const NPq::NProto::TDqPqTopicSource& sourceParams, bool enableStreamingQueriesCounters) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { if (Counters) { @@ -85,11 +87,15 @@ struct TRowDispatcherReadActorMetrics { } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); } - for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { - SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + + auto task = SubGroup; + if (enableStreamingQueriesCounters) { + for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { + SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + } + auto source = SubGroup->GetSubgroup("tx_id", TxId); + task = source->GetSubgroup("task_id", ToString(taskId)); } - auto source = SubGroup->GetSubgroup("tx_id", TxId); - auto task = source->GetSubgroup("task_id", ToString(taskId)); InFlyGetNextBatch = task->GetCounter("InFlyGetNextBatch"); InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData"); ReInit = task->GetCounter("ReInit", true); @@ -318,6 +324,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: // Set on Parent ui64 NextGeneration = 0; ui64 NextEventQueueId = 0; + bool EnableStreamingQueriesCounters = false; TMap> LastUsedPartitionDistribution; TMap> LastReceivedPartitionDistribution; @@ -341,6 +348,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters, TDqPqRdReadActor* parent = nullptr, const TString& cluster = {}); @@ -527,6 +535,7 @@ TDqPqRdReadActor::TDqPqRdReadActor( const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters, TDqPqRdReadActor* parent, const TString& cluster) : TActor(&TDqPqRdReadActor::StateFunc) @@ -535,13 +544,14 @@ TDqPqRdReadActor::TDqPqRdReadActor( , Cluster(cluster) , Token(token) , LocalRowDispatcherActorId(localRowDispatcherActorId) - , Metrics(txId, taskId, counters, SourceParams) + , Metrics(txId, taskId, counters, SourceParams, enableStreamingQueriesCounters) , PqGateway(pqGateway) , HolderFactory(holderFactory) , TypeEnv(typeEnv) , Driver(std::move(driver)) , CredentialsProviderFactory(std::move(credentialsProviderFactory)) , MaxBufferSize(bufferSize) + , EnableStreamingQueriesCounters(enableStreamingQueriesCounters) { SRC_LOG_I("Start read actor, local row dispatcher " << LocalRowDispatcherActorId.ToString() << ", metadatafields: " << JoinSeq(',', SourceParams.GetMetadataFields()) << ", partitions: " << JoinSeq(',', GetPartitionsToRead()) << ", skip json errors: " << SourceParams.GetSkipJsonErrors()); @@ -1536,6 +1546,7 @@ void TDqPqRdReadActor::StartCluster(ui32 clusterIndex) { {}, MaxBufferSize, PqGateway, + EnableStreamingQueriesCounters, this, TString(Clusters[clusterIndex].Info.Name)); Clusters[clusterIndex].Child = actor; @@ -1576,7 +1587,8 @@ std::pair CreateDqPqRdReadActor( const NKikimr::NMiniKQL::THolderFactory& holderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, - const IPqGateway::TPtr& pqGateway) + const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters) { const TString& tokenName = settings.GetToken().GetName(); const TString token = secureParams.Value(tokenName, TString()); @@ -1598,7 +1610,8 @@ std::pair CreateDqPqRdReadActor( token, counters, bufferSize, - pqGateway); + pqGateway, + enableStreamingQueriesCounters); return {actor, actor}; } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h index e470baa257b9..536830269a68 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.h @@ -39,6 +39,7 @@ std::pair CreateDqPqRdReadActor( const NKikimr::NMiniKQL::THolderFactory& holderFactory, const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, - const IPqGateway::TPtr& pqGateway); + const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index bba1ffd8d274..2fcd6ec2b3f7 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -4,6 +4,9 @@ #include "dq_pq_read_actor_base.h" #include "probes.h" +#include +#include + #include #include #include @@ -134,7 +137,8 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, - const NPq::NProto::TDqPqTopicSource& sourceParams) + const NPq::NProto::TDqPqTopicSource& sourceParams, + bool enableStreamingQueriesCounters) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { if (counters) { @@ -143,11 +147,15 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); } - for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { - SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + auto source = SubGroup; + auto task = SubGroup; + if (enableStreamingQueriesCounters) { + for (const auto& sensor : sourceParams.GetTaskSensorLabel()) { + SubGroup = SubGroup->GetSubgroup(sensor.GetLabel(), sensor.GetValue()); + } + source = source->GetSubgroup("tx_id", TxId); + task = source->GetSubgroup("task_id", ToString(taskId)); } - auto source = SubGroup->GetSubgroup("tx_id", TxId); - auto task = source->GetSubgroup("task_id", ToString(taskId)); InFlyAsyncInputData = task->GetCounter("InFlyAsyncInputData"); InFlySubscribe = task->GetCounter("InFlySubscribe"); AsyncInputDataRate = task->GetCounter("AsyncInputDataRate", true); @@ -207,10 +215,11 @@ class TDqPqReadActor : public NActors::TActor, public NYql::NDq: const ::NMonitoring::TDynamicCounterPtr& counters, i64 bufferSize, const IPqGateway::TPtr& pqGateway, - ui32 topicPartitionsCount) + ui32 topicPartitionsCount, + bool enableStreamingQueriesCounters) : TActor(&TDqPqReadActor::StateFunc) , TDqPqReadActorBase(inputIndex, taskId, this->SelfId(), txId, std::move(sourceParams), std::move(readParams), computeActorId) - , Metrics(txId, taskId, counters, SourceParams) + , Metrics(txId, taskId, counters, SourceParams, enableStreamingQueriesCounters) , BufferSize(bufferSize) , HolderFactory(holderFactory) , Driver(std::move(driver)) @@ -932,6 +941,7 @@ std::pair CreateDqPqReadActor( const ::NMonitoring::TDynamicCounterPtr& counters, IPqGateway::TPtr pqGateway, ui32 topicPartitionsCount, + bool enableStreamingQueriesCounters, i64 bufferSize ) { @@ -953,15 +963,16 @@ std::pair CreateDqPqReadActor( counters, bufferSize, pqGateway, - topicPartitionsCount + topicPartitionsCount, + enableStreamingQueriesCounters ); return {actor, actor}; } -void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, const TString& reconnectPeriod) { +void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, const TString& reconnectPeriod, bool enableStreamingQueriesCounters) { factory.RegisterSource("PqSource", - [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, reconnectPeriod]( + [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, reconnectPeriod, enableStreamingQueriesCounters]( NPq::NProto::TDqPqTopicSource&& settings, IDqAsyncIoFactory::TSourceArguments&& args) { @@ -974,12 +985,17 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv TVector readTaskParamsMsg; ui32 topicPartitionsCount = ExtractPartitionsFromParams(readTaskParamsMsg, args.TaskParams, args.ReadRanges); + auto txId = args.TxId; + auto taskParamsIt = args.TaskParams.find("query_path"); + if (taskParamsIt != args.TaskParams.end()) { + txId = taskParamsIt->second; + } if (!settings.GetSharedReading()) { return CreateDqPqReadActor( std::move(settings), args.InputIndex, args.StatsLevel, - args.TxId, + txId, args.TaskId, args.SecureParams, std::move(readTaskParamsMsg), @@ -990,6 +1006,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv counters ? counters : args.TaskCounters, pqGateway, topicPartitionsCount, + enableStreamingQueriesCounters, PQReadDefaultFreeSpace); } @@ -998,7 +1015,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv std::move(settings), args.InputIndex, args.StatsLevel, - args.TxId, + txId, args.TaskId, args.SecureParams, std::move(readTaskParamsMsg), @@ -1009,7 +1026,8 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv args.HolderFactory, counters ? counters : args.TaskCounters, PQReadDefaultFreeSpace, - pqGateway); + pqGateway, + enableStreamingQueriesCounters); }); } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h index d6c2fb2c732b..f06576d84b9d 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h @@ -38,9 +38,10 @@ std::pair CreateDqPqReadActor( const ::NMonitoring::TDynamicCounterPtr& counters, IPqGateway::TPtr pqGateway, ui32 topicPartitionsCount, + bool enableStreamingQueriesCounters, i64 bufferSize = PQReadDefaultFreeSpace ); -void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), const TString& reconnectPeriod = {}); +void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), const TString& reconnectPeriod = {}, bool enableStreamingQueriesCounters = true); } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 1c18fe0eb2e6..b54bacdc4054 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -1,6 +1,9 @@ #include "dq_pq_write_actor.h" #include "probes.h" +#include +#include + #include #include #include @@ -102,7 +105,7 @@ TString MakeStringForLog(const NDqProto::TCheckpoint& checkpoint) { class TDqPqWriteActor : public NActors::TActor, public IDqComputeActorAsyncOutput, TTopicEventProcessor { struct TMetrics { - TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters) + TMetrics(const TTxId& txId, ui64 taskId, const ::NMonitoring::TDynamicCounterPtr& counters, bool enableStreamingQueriesCounters) : TxId(std::visit([](auto arg) { return ToString(arg); }, txId)) , Counters(counters) { if (Counters) { @@ -110,8 +113,11 @@ class TDqPqWriteActor : public NActors::TActor, public IDqCompu } else { SubGroup = MakeIntrusive<::NMonitoring::TDynamicCounters>(); } - auto sink = SubGroup->GetSubgroup("tx_id", TxId); - auto task = sink->GetSubgroup("task_id", ToString(taskId)); + auto task = SubGroup; + if (enableStreamingQueriesCounters) { + auto sink = SubGroup->GetSubgroup("tx_id", TxId); + task = sink->GetSubgroup("task_id", ToString(taskId)); + } LastAckLatency = task->GetCounter("LastAckLatencyMs"); InFlyCheckpoints = task->GetCounter("InFlyCheckpoints"); InFlyData = task->GetCounter("InFlyData"); @@ -157,11 +163,12 @@ class TDqPqWriteActor : public NActors::TActor, public IDqCompu IDqComputeActorAsyncOutput::ICallbacks* callbacks, const ::NMonitoring::TDynamicCounterPtr& counters, i64 freeSpace, - const IPqGateway::TPtr& pqGateway) + const IPqGateway::TPtr& pqGateway, + bool enableStreamingQueriesCounters) : TActor(&TDqPqWriteActor::StateFunc) , OutputIndex(outputIndex) , TxId(txId) - , Metrics(txId, taskId, counters) + , Metrics(txId, taskId, counters, enableStreamingQueriesCounters) , SinkParams(std::move(sinkParams)) , Driver(std::move(driver)) , CredentialsProviderFactory(credentialsProviderFactory) @@ -542,6 +549,7 @@ std::pair CreateDqPqWriteActor( IDqComputeActorAsyncOutput::ICallbacks* callbacks, const ::NMonitoring::TDynamicCounterPtr& counters, IPqGateway::TPtr pqGateway, + bool enableStreamingQueriesCounters, i64 freeSpace) { const TString& tokenName = settings.GetToken().GetName(); @@ -559,29 +567,36 @@ std::pair CreateDqPqWriteActor( callbacks, counters, freeSpace, - pqGateway); + pqGateway, + enableStreamingQueriesCounters); return {actor, actor}; } -void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters) { +void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters, bool enableStreamingQueriesCounters) { factory.RegisterSink("PqSink", - [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway]( + [driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway, enableStreamingQueriesCounters]( NPq::NProto::TDqPqTopicSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { + auto txId = args.TxId; + auto taskParamsIt = args.TaskParams.find("query_path"); + if (taskParamsIt != args.TaskParams.end()) { + txId = taskParamsIt->second; + } NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(DQ_PQ_PROVIDER)); return CreateDqPqWriteActor( std::move(settings), args.OutputIndex, args.StatsLevel, - args.TxId, + txId, args.TaskId, args.SecureParams, driver, credentialsFactory, args.Callback, counters ? counters : args.TaskCounters, - pqGateway + pqGateway, + enableStreamingQueriesCounters ); }); } diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h index 3e5e17c80a03..40a904cc4586 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h @@ -33,8 +33,9 @@ std::pair CreateDqPqWriteActor( IDqComputeActorAsyncOutput::ICallbacks* callbacks, const ::NMonitoring::TDynamicCounterPtr& counters, IPqGateway::TPtr pqGateway, + bool enableStreamingQueriesCounters, i64 freeSpace = DqPqDefaultFreeSpace); -void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>()); +void RegisterDqPqWriteActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>(), bool enableStreamingQueriesCounters = true); } // namespace NYql::NDq diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index 757ff49e9821..ccfb4329479c 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -74,7 +74,8 @@ class TFixture : public TPqIoTestFixture { actor.GetHolderFactory(), MakeIntrusive(), freeSpace, - CreateMockPqGateway({.Runtime = CaSetup->Runtime.get()}) + CreateMockPqGateway({.Runtime = CaSetup->Runtime.get()}), + true ); actor.InitAsyncInput(dqAsyncInput, dqAsyncInputAsActor); diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp index 26a83c2776ec..731961440611 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_read_actor_ut.cpp @@ -57,6 +57,7 @@ class TFixture : public TPqIoTestFixture { MakeIntrusive(), CreatePqNativeGateway(std::move(pqServices)), 1, + true, freeSpace ); diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.cpp b/ydb/tests/fq/pq_async_io/ut_helpers.cpp index 7098cafe40fd..937bb429ff31 100644 --- a/ydb/tests/fq/pq_async_io/ut_helpers.cpp +++ b/ydb/tests/fq/pq_async_io/ut_helpers.cpp @@ -99,6 +99,7 @@ void TPqIoTestFixture::InitAsyncOutput( &actor.GetAsyncOutputCallbacks(), MakeIntrusive(), CreatePqNativeGateway(std::move(pqServices)), + true, freeSpace); actor.InitAsyncOutput(dqAsyncOutput, dqAsyncOutputAsActor); diff --git a/ydb/tests/fq/streaming/conftest.py b/ydb/tests/fq/streaming/conftest.py index 41d2742f68a6..3c8dafcdf590 100644 --- a/ydb/tests/fq/streaming/conftest.py +++ b/ydb/tests/fq/streaming/conftest.py @@ -16,7 +16,8 @@ def get_ydb_config(): erasure=Erasure.MIRROR_3_DC, extra_feature_flags={ "enable_external_data_sources": True, - "enable_streaming_queries": True + "enable_streaming_queries": True, + "enable_streaming_queries_counters": True }, query_service_config={ "available_external_data_sources": ["ObjectStorage", "Ydb", "YdbTopics"],