Skip to content

Commit

Permalink
YQL-16355: Improve timeout settting for literal requests
Browse files Browse the repository at this point in the history
  • Loading branch information
mxkovalev committed May 16, 2024
1 parent 14ca1e8 commit 17b4404
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 26 deletions.
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/gateway/empty_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ class TEmptyGateway : public NYql::IDqGateway {
const NYql::TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter,
const THashMap<TString, TString>& modulesMapping,
bool discard) override
bool discard,
ui64 executionTimeout) override
{
Y_UNUSED(progressWriter);
Y_UNUSED(modulesMapping); // TODO: support.
Y_UNUSED(discard);
Y_UNUSED(executionTimeout);

NProto::TGraphParams params;
THashMap<i64, TString> stagePrograms;
Expand Down
14 changes: 8 additions & 6 deletions ydb/library/yql/providers/dq/actors/executer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
const TDqConfiguration::TPtr& settings,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
TInstant requestStartTime,
bool createTaskSuspended)
bool createTaskSuspended,
ui64 executionTimeout)
: TRichActor<TDqExecuter>(&TDqExecuter::Handler)
, GwmActorId(gwmActorId)
, PrinterId(printerId)
Expand All @@ -61,6 +62,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
, AllocationHistogram(Counters->GetSubgroup("component", "ServiceProxyActorHistograms")->GetHistogram("WorkersAllocationTime", ExponentialHistogram(10, 2, 1)))
, TasksHistogram(Counters->GetSubgroup("component", "ServiceProxyActorHistograms")->GetHistogram("TasksCount", ExponentialHistogram(10, 2, 1)))
, CreateTaskSuspended(createTaskSuspended)
, ExecutionTimeout(executionTimeout)
{ }

~TDqExecuter() {
Expand Down Expand Up @@ -239,9 +241,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
allocateRequest.Release(),
IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession));

Timeout = tasks.size() == 1
? TDuration::MilliSeconds(Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout))
: TDuration::MilliSeconds(Settings->_TableTimeout.Get().GetOrElse(TDqSettings::TDefault::TableTimeout));
Timeout = TDuration::MilliSeconds(ExecutionTimeout > 0 ? ExecutionTimeout : TDqSettings::TDefault::TableTimeout);

YQL_CLOG(DEBUG, ProviderDq) << "Dq timeouts are set to: "
<< ToString(Timeout) << " (global), "
Expand Down Expand Up @@ -514,6 +514,7 @@ class TDqExecuter: public TRichActor<TDqExecuter>, NYql::TCounters {
bool CreateTaskSuspended;
bool Finished = false;
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_FULL;
ui64 ExecutionTimeout;
};

NActors::IActor* MakeDqExecuter(
Expand All @@ -523,9 +524,10 @@ NActors::IActor* MakeDqExecuter(
const TDqConfiguration::TPtr& settings,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
TInstant requestStartTime,
bool createTaskSuspended
bool createTaskSuspended,
ui64 executionTimeout
) {
return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended), traceId);
return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended, executionTimeout), traceId);
}

} // namespace NDq
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/dq/actors/executer_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ NActors::IActor* MakeDqExecuter(
const TDqConfiguration::TPtr& settings,
const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
TInstant requestStartTime = TInstant::Now(),
bool createTaskSuspended = false
bool createTaskSuspended = false,
ui64 executionTimeout = 0
);

} // namespace NDq
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/dq/api/protos/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ message ExecuteGraphRequest {
string RateLimiterResource = 15;
map<string, bytes> CommonTaskParams = 16; // to be merged into each task TTaskMeta.TaskParams
NYql.NDqProto.EDqStatsMode StatsMode = 17;
uint64 ExecutionTimeout = 18; // in milliseconds
}

message ExecuteGraphResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalIm
THashMap<TString, TString> ModulesMapping;
bool Discard;
NThreading::TPromise<IDqGateway::TResult> Result;
ui64 ExecutionTimeout;
};

public:
Expand All @@ -148,13 +149,13 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalIm
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard)
bool discard, ui64 executionTimeout)
{

NThreading::TFuture<IDqGateway::TResult> result;
{
TGuard<TMutex> lock(Mutex);
Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>()});
Queue.emplace_back(TRequest{sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, NThreading::NewPromise<IDqGateway::TResult>(), executionTimeout});
result = Queue.back().Result;
}

Expand All @@ -177,7 +178,7 @@ class TDqGatewayLocalImpl: public std::enable_shared_from_this<TDqGatewayLocalIm

auto weak = weak_from_this();

Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard)
Gateway->ExecutePlan(request.SessionId, std::move(request.Plan), request.Columns, request.SecureParams, request.GraphParams, request.Settings, request.ProgressWriter, request.ModulesMapping, request.Discard, request.ExecutionTimeout)
.Apply([promise=request.Result, weak](const NThreading::TFuture<IDqGateway::TResult>& result) mutable {
try {
promise.SetValue(result.GetValue());
Expand Down Expand Up @@ -228,10 +229,10 @@ class TDqGatewayLocal : public IDqGateway {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) override
bool discard, ui64 executionTimeout) override
{
return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams,
settings, progressWriter, modulesMapping, discard);
settings, progressWriter, modulesMapping, discard, executionTimeout);
}

void Stop() override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
}

TInstant startTime = TInstant::Now();
ExecutionTimeout = State->Settings->_LiteralTimeout.Get().GetOrElse(TDqSettings::TDefault::LiteralTimeout);

try {
auto result = TMaybeNode<TResult>(input).Cast();
Expand Down Expand Up @@ -1017,7 +1018,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
graphParams["Evaluation"] = ToString(!ctx.Step.IsDone(TExprStep::ExprEval));
future = State->ExecutePlan(
State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);
settings, progressWriter, ModulesMapping, fillSettings.Discard, ExecutionTimeout);
}
}

Expand Down Expand Up @@ -1264,6 +1265,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
YQL_CLOG(TRACE, ProviderDq) << "HandlePull " << NCommon::ExprToPrettyString(ctx, *input);

TInstant startTime = TInstant::Now();
ExecutionTimeout = State->Settings->_TableTimeout.Get().GetOrElse(TDqSettings::TDefault::TableTimeout);
auto pull = TPull(input);

THashMap<TString, TString> pullSettings;
Expand Down Expand Up @@ -1463,7 +1465,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);

auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), columns, secureParams, graphParams,
settings, progressWriter, ModulesMapping, fillSettings.Discard);
settings, progressWriter, ModulesMapping, fillSettings.Discard, ExecutionTimeout);

future.Subscribe([publicIds, progressWriter = State->ProgressWriter](const NThreading::TFuture<IDqGateway::TResult>& completedFuture) {
YQL_ENSURE(!completedFuture.HasException());
Expand Down Expand Up @@ -1977,7 +1979,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
IDqGateway::TDqProgressWriter progressWriter = MakeDqProgressWriter(publicIds);

auto future = State->ExecutePlan(State->SessionId, executionPlanner->GetPlan(), {}, secureParams, graphParams,
settings, progressWriter, ModulesMapping, false);
settings, progressWriter, ModulesMapping, false, ExecutionTimeout);

executionPlanner.Destroy();

Expand Down Expand Up @@ -2082,6 +2084,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
mutable THashMap<TString, TString> ModulesMapping;

const ui64 MaxFileReadSize = 1_MB;
ui64 ExecutionTimeout = 0;
};

}
Expand Down
11 changes: 6 additions & 5 deletions ydb/library/yql/providers/dq/provider/yql_dq_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class TDqGatewaySession: public std::enable_shared_from_this<TDqGatewaySession>
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard)
bool discard, ui64 executionTimeout)
{
YQL_LOG_CTX_ROOT_SESSION_SCOPE(SessionId);

Expand All @@ -257,6 +257,7 @@ class TDqGatewaySession: public std::enable_shared_from_this<TDqGatewaySession>
YQL_ENSURE(!file.GetObjectId().empty());
}
}
queryPB.SetExecutionTimeout(executionTimeout);
queryPB.SetSession(SessionId);
queryPB.SetResultType(plan.ResultType);
queryPB.SetSourceId(plan.SourceID.NodeId()-1);
Expand Down Expand Up @@ -521,7 +522,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this<TDqGatewayImpl> {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard)
bool discard, ui64 executionTimeout)
{
std::shared_ptr<TDqGatewaySession> session;
with_lock(Mutex) {
Expand All @@ -534,7 +535,7 @@ class TDqGatewayImpl: public std::enable_shared_from_this<TDqGatewayImpl> {
YQL_CLOG(ERROR, ProviderDq) << "Session was closed: " << sessionId;
return MakeFuture(NCommon::ResultFromException<TResult>(yexception() << "Session was closed"));
}
return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard)
return session->ExecutePlan(std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout)
.Apply([](const TFuture<TResult>& f) {
try {
f.TryRethrow();
Expand Down Expand Up @@ -586,9 +587,9 @@ class TDqGateway: public IDqGateway {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) override
bool discard, ui64 executionTimeout) override
{
return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard);
return Impl->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout);
}

TString GetVanillaJobPath() override {
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/dq/provider/yql_dq_gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class IDqGateway : public TThrRefBase {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) = 0;
bool discard, ui64 executionTimeout) = 0;

virtual TString GetVanillaJobPath() {
return "";
Expand Down
7 changes: 4 additions & 3 deletions ydb/library/yql/providers/dq/provider/yql_dq_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct TDqState: public TThrRefBase {
std::function<void()> AbortHidden = [](){};
NThreading::TAsyncSemaphore::TPtr OperationSemaphore = nullptr; // pragmas are not yet parsed, so we initialize it later
TAdaptiveLock Mutex_;
ui64 ExecutionTimeout;

TDqState(
const IDqGateway::TPtr& dqGateway,
Expand Down Expand Up @@ -82,16 +83,16 @@ struct TDqState: public TThrRefBase {
const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& graphParams,
const TDqSettings::TPtr& settings,
const IDqGateway::TDqProgressWriter& progressWriter, const THashMap<TString, TString>& modulesMapping,
bool discard) {
bool discard, ui64 executionTimeout) {
with_lock(Mutex_) {
if (!OperationSemaphore) {
const auto parallelOperationsLimit = Settings->ParallelOperationsLimit.Get().GetOrElse(TDqSettings::TDefault::ParallelOperationsLimit);
OperationSemaphore = NThreading::TAsyncSemaphore::Make(parallelOperationsLimit);
}
}
return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr<TDqState>(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard](const auto& f) mutable {
return OperationSemaphore->AcquireAsync().Apply([this_=TIntrusivePtr<TDqState>(this), sessionId, plan=std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout](const auto& f) mutable {
auto lock = f.GetValue()->MakeAutoRelease();
return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard).Apply([unlock = lock.DeferRelease()](const auto& f) {
return this_->DqGateway->ExecutePlan(sessionId, std::move(plan), columns, secureParams, graphParams, settings, progressWriter, modulesMapping, discard, executionTimeout).Apply([unlock = lock.DeferRelease()](const auto& f) {
unlock(NThreading::MakeFuture());
return f;
});
Expand Down
4 changes: 3 additions & 1 deletion ydb/library/yql/providers/dq/service/grpc_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ namespace NYql::NDqs {
: TServiceProxyActor(ctx, counters, traceId, username)
, GraphExecutionEventsActorId(graphExecutionEventsActorId)
{
ExecutionTimeout = Request->GetExecutionTimeout();
}

void DoRetry() override {
Expand Down Expand Up @@ -378,7 +379,7 @@ namespace NYql::NDqs {
YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__;
MergeTaskMetas(params);

auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime));
auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime, false, ExecutionTimeout));

TVector<TString> columns;
columns.reserve(Request->GetColumns().size());
Expand Down Expand Up @@ -427,6 +428,7 @@ namespace NYql::NDqs {
}

NActors::TActorId GraphExecutionEventsActorId;
ui64 ExecutionTimeout;
};

TString GetVersionString() {
Expand Down

0 comments on commit 17b4404

Please sign in to comment.