diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 411f68af5c3e..ef614a4d60da 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -44,7 +44,8 @@ class TDqExecuter: public TRichActor, NYql::TCounters { const TDqConfiguration::TPtr& settings, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TInstant requestStartTime, - bool createTaskSuspended) + bool createTaskSuspended, + ui64 executionTimeout) : TRichActor(&TDqExecuter::Handler) , GwmActorId(gwmActorId) , PrinterId(printerId) @@ -61,6 +62,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { , AllocationHistogram(Counters->GetSubgroup("component", "ServiceProxyActorHistograms")->GetHistogram("WorkersAllocationTime", ExponentialHistogram(10, 2, 1))) , TasksHistogram(Counters->GetSubgroup("component", "ServiceProxyActorHistograms")->GetHistogram("TasksCount", ExponentialHistogram(10, 2, 1))) , CreateTaskSuspended(createTaskSuspended) + , ExecutionTimeout(executionTimeout) { } ~TDqExecuter() { @@ -239,9 +241,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { allocateRequest.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession)); - Timeout = tasks.size() == 1 - ? TDuration::MilliSeconds(Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout)) - : TDuration::MilliSeconds(Settings->_TableTimeout.Get().GetOrElse(TDqSettings::TDefault::TableTimeout)); + Timeout = TDuration::MilliSeconds(ExecutionTimeout > 0 ? ExecutionTimeout : TDqSettings::TDefault::TableTimeout); YQL_CLOG(DEBUG, ProviderDq) << "Dq timeouts are set to: " << ToString(Timeout) << " (global), " @@ -514,6 +514,7 @@ class TDqExecuter: public TRichActor, NYql::TCounters { bool CreateTaskSuspended; bool Finished = false; NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_FULL; + ui64 ExecutionTimeout; }; NActors::IActor* MakeDqExecuter( @@ -523,9 +524,10 @@ NActors::IActor* MakeDqExecuter( const TDqConfiguration::TPtr& settings, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TInstant requestStartTime, - bool createTaskSuspended + bool createTaskSuspended, + ui64 executionTimeout ) { - return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended), traceId); + return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended, executionTimeout), traceId); } } // namespace NDq diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.h b/ydb/library/yql/providers/dq/actors/executer_actor.h index 0a453d513396..88dc1022fdb7 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.h +++ b/ydb/library/yql/providers/dq/actors/executer_actor.h @@ -15,7 +15,8 @@ NActors::IActor* MakeDqExecuter( const TDqConfiguration::TPtr& settings, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TInstant requestStartTime = TInstant::Now(), - bool createTaskSuspended = false + bool createTaskSuspended = false, + ui64 executionTimeout = 0 ); } // namespace NDq diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto index 194e2201276c..d569fba231b8 100644 --- a/ydb/library/yql/providers/dq/api/protos/service.proto +++ b/ydb/library/yql/providers/dq/api/protos/service.proto @@ -88,6 +88,7 @@ message ExecuteGraphRequest { string RateLimiterResource = 15; map CommonTaskParams = 16; // to be merged into each task TTaskMeta.TaskParams NYql.NDqProto.EDqStatsMode StatsMode = 17; + uint64 ExecutionTimeout = 18; // in milliseconds } message ExecuteGraphResponse { diff --git a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp index ac5552080c78..78b5a2f22576 100644 --- a/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp +++ b/ydb/library/yql/providers/dq/local_gateway/yql_dq_gateway_local.cpp @@ -126,6 +126,7 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this ModulesMapping; bool Discard; NThreading::TPromise Result; + ui64 ExecutionTimeout; }; public: @@ -148,13 +149,13 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const IDqGateway::TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) + bool discard, ui64 executionTimeout) { NThreading::TFuture result; { TGuard lock(Mutex); - Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise()}); + Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise(), executionTimeout}); result = Queue.back().Result; } @@ -177,7 +178,7 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_thisExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard) + Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard, request.ExecutionTimeout) .Apply([promise=request.Result, weak](const NThreading::TFuture& result) mutable { try { promise.SetValue(result.GetValue()); @@ -228,10 +229,10 @@ class TDqGatewayLocal : public IDqGateway { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) override + bool discard, ui64 executionTimeout) override { return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, - settings, progressWriter, modulesMapping, discard); + settings, progressWriter, modulesMapping, discard, executionTimeout); } void Stop() override { diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index f8d44a82f4cb..5fed921bd9f9 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -875,6 +875,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters } TInstant startTime = TInstant::Now(); + State->ExecutionTimeout = State->Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout); try { auto result = TMaybeNode(input).Cast(); @@ -1264,6 +1265,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters YQL_CLOG(TRACE, ProviderDq) << "HandlePull " << NCommon::ExprToPrettyString(ctx, *input); TInstant startTime = TInstant::Now(); + State->ExecutionTimeout = State->Settings->_TableTimeout.Get().GetOrElse(TDqSettings::TDefault::TableTimeout); auto pull = TPull(input); THashMap pullSettings; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp index 8ad08f347fe2..cc81a0dbc7f8 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp @@ -241,7 +241,7 @@ class TDqGatewaySession: public std::enable_shared_from_this const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) + bool discard, ui64 executionTimeout) { YQL_LOG_CTX_ROOT_SESSION_SCOPE(SessionId); @@ -257,6 +257,7 @@ class TDqGatewaySession: public std::enable_shared_from_this YQL_ENSURE(!file.GetObjectId().empty()); } } + queryPB.SetExecutionTimeout(executionTimeout); queryPB.SetSession(SessionId); queryPB.SetResultType(plan.ResultType); queryPB.SetSourceId(plan.SourceID.NodeId()-1); @@ -521,7 +522,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) + bool discard, ui64 executionTimeout) { std::shared_ptr session; with_lock(Mutex) { @@ -534,7 +535,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this { YQL_CLOG(ERROR, ProviderDq) << "Session was closed: " << sessionId; return MakeFuture(NCommon::ResultFromException(yexception() << "Session was closed")); } - return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard) + return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout) .Apply([](const TFuture& f) { try { f.TryRethrow(); @@ -586,9 +587,9 @@ class TDqGateway: public IDqGateway { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) override + bool discard, ui64 executionTimeout) override { - return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard); + return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout); } TString GetVanillaJobPath() override { diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h index 42d07c21f096..798169bf7adc 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_gateway.h @@ -75,7 +75,7 @@ class IDqGateway : public TThrRefBase { const THashMap& secureParams, const THashMap& graphParams, const TDqSettings::TPtr& settings, const TDqProgressWriter& progressWriter, const THashMap& modulesMapping, - bool discard) = 0; + bool discard, ui64 executionTimeout) = 0; virtual TString GetVanillaJobPath() { return ""; diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_state.h b/ydb/library/yql/providers/dq/provider/yql_dq_state.h index 5a369805efc5..9f0d51eff609 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_state.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_state.h @@ -41,6 +41,7 @@ struct TDqState: public TThrRefBase { std::function AbortHidden = [](){}; NThreading::TAsyncSemaphore::TPtr OperationSemaphore = nullptr; // pragmas are not yet parsed, so we initialize it later TAdaptiveLock Mutex_; + ui64 ExecutionTimeout; TDqState( const IDqGateway::TPtr& dqGateway, @@ -91,7 +92,7 @@ struct TDqState: public TThrRefBase { } return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard](const auto& f) mutable { auto lock = f.GetValue()->MakeAutoRelease(); - return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard).Apply([unlock = lock.DeferRelease()](const auto& f) { + return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, this_->ExecutionTimeout).Apply([unlock = lock.DeferRelease()](const auto& f) { unlock(NThreading::MakeFuture()); return f; }); diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index b95fcc344c40..96c715bdd316 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -276,6 +276,7 @@ namespace NYql::NDqs { : TServiceProxyActor(ctx, counters, traceId, username) , GraphExecutionEventsActorId(graphExecutionEventsActorId) { + ExecutionTimeout = Request->GetExecutionTimeout(); } void DoRetry() override { @@ -378,7 +379,7 @@ namespace NYql::NDqs { YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__; MergeTaskMetas(params); - auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime)); + auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime, ExecutionTimeout)); TVector columns; columns.reserve(Request->GetColumns().size()); @@ -427,6 +428,7 @@ namespace NYql::NDqs { } NActors::TActorId GraphExecutionEventsActorId; + ui64 ExecutionTimeout; }; TString GetVersionString() {