diff --git a/ydb/core/kqp/counters/kqp_counters.cpp b/ydb/core/kqp/counters/kqp_counters.cpp index 579cb60d93ab..2d37d1cad346 100644 --- a/ydb/core/kqp/counters/kqp_counters.cpp +++ b/ydb/core/kqp/counters/kqp_counters.cpp @@ -896,11 +896,7 @@ TKqpCounters::TKqpCounters(const ::NMonitoring::TDynamicCounterPtr& counters, co /* Statistics batch operations */ BatchOperationUpdateRows = KqpGroup->GetCounter("BatchOperation/Update/Rows", true); - BatchOperationUpdateBytes = KqpGroup->GetCounter("BatchOperation/Update/Bytes", true); - BatchOperationDeleteRows = KqpGroup->GetCounter("BatchOperation/Delete/Rows", true); - BatchOperationDeleteBytes = KqpGroup->GetCounter("BatchOperation/Delete/Bytes", true); - BatchOperationRetries = KqpGroup->GetCounter("BatchOperation/Retries", true); } diff --git a/ydb/core/kqp/counters/kqp_counters.h b/ydb/core/kqp/counters/kqp_counters.h index e8edd0f08b2f..5d0c8ba38758 100644 --- a/ydb/core/kqp/counters/kqp_counters.h +++ b/ydb/core/kqp/counters/kqp_counters.h @@ -487,9 +487,7 @@ class TKqpCounters : public TKqpCountersBase, public NYql::NDq::TSpillingCounter // Statistics batch operations ::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationUpdateRows; - ::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationUpdateBytes; ::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationDeleteRows; - ::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationDeleteBytes; ::NMonitoring::TDynamicCounters::TCounterPtr BatchOperationRetries; }; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index e6fc3e80236a..52abf7d6dc31 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -2175,10 +2175,7 @@ class TKqpExecuterBase : public TActor { if (!BatchOperationSettings.Empty() && !Stats->TableStats.empty()) { auto [_, tableStats] = *Stats->TableStats.begin(); Counters->Counters->BatchOperationUpdateRows->Add(tableStats->GetWriteRows()); - Counters->Counters->BatchOperationUpdateBytes->Add(tableStats->GetWriteBytes()); - Counters->Counters->BatchOperationDeleteRows->Add(tableStats->GetEraseRows()); - Counters->Counters->BatchOperationDeleteBytes->Add(tableStats->GetEraseBytes()); } auto finishSize = Stats->EstimateFinishMem(); diff --git a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp index 5a37adb99e13..a6718ab43290 100644 --- a/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_partitioned_executer.cpp @@ -29,7 +29,7 @@ namespace { #define PE_LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, LogPrefix() << msg) #define PE_LOG_T(msg) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, LogPrefix() << msg) -/* +/** * TKqpPartitionedExecuter only executes BATCH UPDATE/DELETE queries * with the idempotent set of updates (except primary key), without RETURNING, * only for row tables and without any joins or subqueries. @@ -87,8 +87,8 @@ class TKqpPartitionedExecuter : public TActorBootstrappedGetTypeRewrite()); } } catch (...) { - RuntimeError( - Ydb::StatusIds::INTERNAL_ERROR, - NYql::TIssues({NYql::TIssue(TStringBuilder() - << "KqpPartitionedExecuterActor got an unknown error, state = " << CurrentStateFuncName())})); + AbortWithError(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(TStringBuilder() + << "KqpPartitionedExecuterActor got an unknown error, state = " << CurrentStateFuncName())})); } } void HandlePrepare(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { auto* request = ev->Get()->Request.Get(); - PE_LOG_D("Got TEvTxProxySchemeCache::TEvResolveKeySetResult from ActorId = " << ev->Sender); + PE_LOG_D("Got TEvResolveKeySetResult from actorId = " << ev->Sender); if (request->ErrorCount > 0) { - return RuntimeError( - Ydb::StatusIds::INTERNAL_ERROR, - NYql::TIssues({NYql::TIssue(TStringBuilder() - << "KqpPartitionedExecuterActor could not resolve a partitioning of the table, state = " << CurrentStateFuncName())})); + PE_LOG_E("Failed to resolve table partitioning, errorCount = " << request->ErrorCount); + AbortWithError(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(TStringBuilder() + << "KqpPartitionedExecuterActor could not resolve a partitioning of the table, state = " << CurrentStateFuncName())})); + return; } YQL_ENSURE(request->ResultSet.size() == 1); @@ -139,29 +137,24 @@ class TKqpPartitionedExecuter : public TActorBootstrappedGet()->GetIssues(); auto it = ExecuterToPartition.find(ev->Sender); + if (it != ExecuterToPartition.end()) { - PE_LOG_D("Got TEvKqp::EvAbortExecution from ActorId = " << ev->Sender - << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) - << ", message: " << issues.ToOneLineString() << ", abort child executers"); + PE_LOG_W("Got TEvAbortExecution from actorId = " << ev->Sender + << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) + << ", message: " << issues.ToOneLineString()); auto [_, partInfo] = *it; - AbortBuffer(partInfo->ExecuterId); + AbortBuffer(partInfo->BufferId); ForgetExecuterAndBuffer(partInfo); ForgetPartition(partInfo); } else { - PE_LOG_D("Got TEvKqp::TEvAbortExecution from unknown actor with Id = " << ev->Sender + PE_LOG_D("Got TEvAbortExecution from ActorId = " << ev->Sender << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) - << ", message: " << issues.ToOneLineString() << ", ignore"); + << ", message: " << issues.ToOneLineString() + << ", isSessionActor: " << (ev->Sender == SessionActorId)); } - if (ReturnStatus == Ydb::StatusIds::SUCCESS) { - ReturnStatus = Ydb::StatusIds::ABORTED; - ReturnIssues.AddIssues(issues); - ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder() - << "aborting by KqpPartitionedExecuterActor")); - } - - Abort(); + AbortWithError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), std::move(issues)); } STFUNC(ExecuteState) { @@ -175,10 +168,8 @@ class TKqpPartitionedExecuter : public TActorBootstrappedGetTypeRewrite()); } } catch (...) { - RuntimeError( - Ydb::StatusIds::INTERNAL_ERROR, - NYql::TIssues({NYql::TIssue(TStringBuilder() - << "KqpPartitionedExecuterActor got an unknown error, state = " << CurrentStateFuncName())})); + AbortWithError(Ydb::StatusIds::INTERNAL_ERROR, NYql::TIssues({NYql::TIssue(TStringBuilder() + << "KqpPartitionedExecuterActor got an unknown error, state = " << CurrentStateFuncName())})); } } @@ -187,20 +178,23 @@ class TKqpPartitionedExecuter : public TActorBootstrappedSender); if (it == ExecuterToPartition.end()) { - PE_LOG_D("Got TEvKqpExecuter::TEvTxResponse from unknown actor with Id = " << ev->Sender + PE_LOG_D("Got TEvTxResponse from unknown actor with actorId = " << ev->Sender << ", status = " << response->GetStatus() << ", ignore"); return; } - PE_LOG_I("Got TEvKqpExecuter::TEvTxResponse from ActorId = " << ev->Sender + auto [_, partInfo] = *it; + + PE_LOG_D("Got TEvTxResponse from actorId = " << ev->Sender + << ", partitionIndex = " << partInfo->PartitionIndex << ", status = " << response->GetStatus()); - auto [_, partInfo] = *it; AbortBuffer(partInfo->BufferId); ForgetExecuterAndBuffer(partInfo); switch (response->GetStatus()) { case Ydb::StatusIds::SUCCESS: + PE_LOG_I("Partition " << partInfo->PartitionIndex << " completed successfully"); partInfo->RetryDelayMs = Settings.StartRetryDelayMs; partInfo->LimitSize = std::min(partInfo->LimitSize * 2, Settings.MaxBatchSize); return OnSuccessResponse(partInfo, ev->Get()); @@ -209,6 +203,7 @@ class TKqpPartitionedExecuter : public TActorBootstrappedPartitionIndex << " will be retried, status = " << response->GetStatus()); return ScheduleRetryWithNewLimit(partInfo); default: break; @@ -216,17 +211,18 @@ class TKqpPartitionedExecuter : public TActorBootstrappedGetIssues(), ReturnIssues); - ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder() - << "while executing by KqpPartitionedExecuterActor")); - - RuntimeError(response->GetStatus(), ReturnIssues); + NYql::TIssues issues; + NYql::IssuesFromMessage(response->GetIssues(), issues); + PE_LOG_W("Partition " << partInfo->PartitionIndex << " failed with status = " << response->GetStatus() + << ", message: " << issues.ToOneLineString()); + AbortWithError(response->GetStatus(), std::move(issues)); } void HandleExecute(TEvKqpExecuter::TEvTxDelayedExecution::TPtr& ev) { RequestCounters->Counters->BatchOperationRetries->Inc(); auto& partInfo = StartedPartitions[ev->Get()->PartitionIdx]; + PE_LOG_D("Delayed execution timer fired for partitionIndex = " << ev->Get()->PartitionIdx); RetryPartExecution(partInfo); } @@ -235,15 +231,18 @@ class TKqpPartitionedExecuter : public TActorBootstrappedSender); if (it == BufferToPartition.end()) { - PE_LOG_D("Got TEvKqpBuffer::TEvError from unknown actor with Id = " << ev->Sender << ", status = " - << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << ", ignore"); + PE_LOG_D("Got TEvError from unknown buffer with actorId = " << ev->Sender << ", status = " + << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << ", ignore"); return; } - PE_LOG_D("Got TEvKqpBuffer::TEvError from ActorId = " << ev->Sender << ", status = " - << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode)); - auto [_, partInfo] = *it; + + PE_LOG_W("Got TEvError from buffer actorId = " << ev->Sender + << ", partitionIndex = " << partInfo->PartitionIndex + << ", status = " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) + << ", message: " << msg.Issues.ToOneLineString()); + AbortExecuter(partInfo->ExecuterId, "got error from KqpBufferWriteActor"); ForgetExecuterAndBuffer(partInfo); @@ -256,19 +255,14 @@ class TKqpPartitionedExecuter : public TActorBootstrappedPartitionIndex << " buffer error, will retry"); return ScheduleRetryWithNewLimit(partInfo); default: break; } ForgetPartition(partInfo); - - ReturnIssues.AddIssues(msg.Issues); - - RuntimeError( - Ydb::StatusIds::INTERNAL_ERROR, - NYql::TIssues({NYql::TIssue(TStringBuilder() - << "while executing by KqpPartitionedExecuterActor")})); + AbortWithError(NYql::NDq::DqStatusToYdbStatus(msg.StatusCode), msg.Issues); } STFUNC(AbortState) { @@ -279,13 +273,11 @@ class TKqpPartitionedExecuter : public TActorBootstrappedBufferId); ForgetExecuterAndBuffer(partInfo); ForgetPartition(partInfo); if (CheckExecutersAreFinished()) { - PE_LOG_I("All executers have been finished, abort KqpPartitionedExecuterActor"); - RuntimeError(ReturnStatus, ReturnIssues); + PE_LOG_N("All executers have been finished, replying with error: " << Ydb::StatusIds_StatusCode_Name(ReturnStatus)); + ReplyErrorAndDie(ReturnStatus, ReturnIssues); } } @@ -318,25 +312,54 @@ class TKqpPartitionedExecuter : public TActorBootstrappedSender); if (it == ExecuterToPartition.end()) { - PE_LOG_D("Got TEvKqp::EvAbortExecution from unknown actor with Id = " << ev->Sender - << " , status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) + PE_LOG_D("Got TEvAbortExecution in AbortState from unknown actor with actorId = " << ev->Sender + << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << ", message: " << issues.ToOneLineString() << ", ignore"); return; } - PE_LOG_D("Got TEvKqp::EvAbortExecution from ActorId = " << ev->Sender - << " , status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) - << ", message: " << issues.ToOneLineString()); - auto [_, partInfo] = *it; + + PE_LOG_D("Got TEvAbortExecution in AbortState from actorId = " << ev->Sender + << ", partitionIndex = " << partInfo->PartitionIndex + << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) + << ", issues: " << issues.ToOneLineString() << ", finishing partition"); + AbortBuffer(partInfo->BufferId); ForgetExecuterAndBuffer(partInfo); + ForgetPartition(partInfo); + + if (CheckExecutersAreFinished()) { + PE_LOG_N("All executers have been finished, replying with error: " << Ydb::StatusIds_StatusCode_Name(ReturnStatus)); + ReplyErrorAndDie(ReturnStatus, ReturnIssues); + } } void HandleAbort(TEvKqpBuffer::TEvError::TPtr& ev) { const auto& msg = *ev->Get(); - PE_LOG_D("Got TEvError from KqpBufferWriteActor with Id = " << ev->Sender << ", status = " + + auto it = BufferToPartition.find(ev->Sender); + if (it == BufferToPartition.end()) { + PE_LOG_D("Got TEvError in AbortState from unknown buffer with actorId = " << ev->Sender << ", status = " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) << ", ignore"); + return; + } + + auto [_, partInfo] = *it; + + PE_LOG_D("Got TEvError in AbortState from buffer actorId = " << ev->Sender + << ", partitionIndex = " << partInfo->PartitionIndex + << ", status = " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.StatusCode) + << ", finishing partition"); + + AbortExecuter(partInfo->ExecuterId, "got error from KqpBufferWriteActor"); + ForgetExecuterAndBuffer(partInfo); + ForgetPartition(partInfo); + + if (CheckExecutersAreFinished()) { + PE_LOG_N("All executers have been finished, replying with error: " << Ydb::StatusIds_StatusCode_Name(ReturnStatus)); + ReplyErrorAndDie(ReturnStatus, ReturnIssues); + } } TString LogPrefix() const { @@ -415,8 +438,6 @@ class TKqpPartitionedExecuter : public TActorBootstrapped minKey(KeyIds.size()); const TTableRange range(minKey, true, {}, false, false); @@ -435,6 +456,9 @@ class TKqpPartitionedExecuter : public TActorBootstrappedsize() << " partitions, starting first " + << std::min(Settings.PartitionExecutionLimit, TablePartitioning->size()) << " in parallel"); + Become(&TKqpPartitionedExecuter::ExecuteState); auto partCount = std::min(Settings.PartitionExecutionLimit, TablePartitioning->size()); @@ -508,35 +532,35 @@ class TKqpPartitionedExecuter : public TActorBootstrappedBufferId = bufferActorId; ExecuterToPartition[exId] = BufferToPartition[bufferActorId] = partInfo; - PE_LOG_I("Create new KQP executer by KqpPartitionedExecuterActor: ExecuterId = " << partInfo->ExecuterId - << ", PartitionIndex = " << partitionIndex << ", LimitSize = " << partInfo->LimitSize - << ", RetryDelayMs = " << partInfo->RetryDelayMs); + PE_LOG_D("Created " << (isRetry ? "retry" : "new") << " executer for partitionIndex = " << partitionIndex + << ", executerId = " << partInfo->ExecuterId + << ", bufferId = " << bufferActorId + << ", limitSize = " << partInfo->LimitSize); auto ev = std::make_unique(exId); Send(MakeTxProxyID(), ev.release()); } void Abort() { + PE_LOG_W("Entering AbortState, returnStatus = " << Ydb::StatusIds_StatusCode_Name(ReturnStatus) + << ", active partitionsCount = " << StartedPartitions.size()); + Become(&TKqpPartitionedExecuter::AbortState); if (CheckExecutersAreFinished()) { - PE_LOG_I("All executers have been finished, abort KqpPartitionedExecuterActor"); - return RuntimeError(ReturnStatus, ReturnIssues); + PE_LOG_N("All executers have been finished, replying with error immediately"); + ReplyErrorAndDie(ReturnStatus, ReturnIssues); + return; } - SendAbortToExecuters(); - } - - void SendAbortToExecuters() { - PE_LOG_I("Send abort to executers"); - + PE_LOG_I("Sending abort to " << ExecuterToPartition.size() << " executers"); for (auto& [exId, partInfo] : ExecuterToPartition) { - AbortExecuter(exId, "runtime error"); + AbortExecuter(exId, ReturnIssues.ToOneLineString()); } } void AbortExecuter(TActorId id, const TString& reason) { - auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Aborted by KqpPartitionedExecuterActor, reason: " + reason); + auto abortEv = TEvKqp::TEvAbortExecution::Aborted("Aborted by KqpPartitionedExecuterActor: " + reason); Send(id, abortEv.Release()); } @@ -557,32 +581,35 @@ class TKqpPartitionedExecuter : public TActorBootstrappedBatchOperationMaxKeys), std::move(ev->BatchOperationKeyIds)); if (minKey) { if (!IsKeyInPartition(minKey.GetCells(), partInfo)) { - ReturnStatus = Ydb::StatusIds::PRECONDITION_FAILED; - ReturnIssues.AddIssue(NYql::TIssue(TStringBuilder() - << "The next key from KqpReadActor does not belong to the partition with PartitionIndex = " - << partInfo->PartitionIndex)); + PE_LOG_E("Partition " << partInfo->PartitionIndex << " returned key outside its range"); ForgetPartition(partInfo); - return Abort(); + AbortWithError(Ydb::StatusIds::PRECONDITION_FAILED, NYql::TIssues({NYql::TIssue(TStringBuilder() + << "The next key from KqpReadActor does not belong to the partition with partitionIndex = " + << partInfo->PartitionIndex)})); + return; } + PE_LOG_D("Partition " << partInfo->PartitionIndex << " has more data, continue processing"); partInfo->BeginRange = TKeyDesc::TPartitionRangeInfo(minKey, /* IsInclusive */ false, /* IsPoint */ false); return RetryPartExecution(partInfo); } + PE_LOG_D("Partition " << partInfo->PartitionIndex << " finished completely"); ForgetPartition(partInfo); if (NextPartitionIndex < TablePartitioning->size()) { + PE_LOG_D("Starting next partition " << NextPartitionIndex << " of " << TablePartitioning->size()); return CreateExecuterWithBuffer(NextPartitionIndex++, /* isRetry */ false); } if (CheckExecutersAreFinished()) { - auto& response = *ResponseEv->Record.MutableResponse(); - response.SetStatus(ReturnStatus); - - PE_LOG_I("All executers have been finished. Send SUCCESS to SessionActor"); - - Send(SessionActorId, ResponseEv.release()); - PassAway(); + if (ReturnStatus != Ydb::StatusIds::SUCCESS) { + PE_LOG_N("All partitions processed, but have error: " << Ydb::StatusIds_StatusCode_Name(ReturnStatus)); + ReplyErrorAndDie(ReturnStatus, ReturnIssues); + } else { + PE_LOG_I("All partitions processed successfully"); + ReplySuccessAndDie(); + } } } @@ -596,46 +623,48 @@ class TKqpPartitionedExecuter : public TActorBootstrappedPartitionIndex + << ", limitSize = " << partInfo->LimitSize + << ", retryDelayMs = " << partInfo->RetryDelayMs); return CreateExecuterWithBuffer(partInfo->PartitionIndex, /* isRetry */ true); } + PE_LOG_D("Partition " << partInfo->PartitionIndex << " retry cancelled due to AbortState"); ForgetPartition(partInfo); if (CheckExecutersAreFinished()) { - PE_LOG_I("All executers have been finished, abort KqpPartitionedExecuterActor"); - RuntimeError(ReturnStatus, ReturnIssues); + PE_LOG_N("All executers have been finished, replying with error: " << Ydb::StatusIds_StatusCode_Name(ReturnStatus)); + ReplyErrorAndDie(ReturnStatus, ReturnIssues); } } void ScheduleRetryWithNewLimit(TBatchPartitionInfo::TPtr& partInfo) { if (partInfo->RetryDelayMs == Settings.MaxRetryDelayMs) { + PE_LOG_E("Partition " << partInfo->PartitionIndex << " reached maximum retry delay (" + << Settings.MaxRetryDelayMs << " ms), giving up"); ForgetPartition(partInfo); - - if (this->CurrentStateFunc() != &TKqpPartitionedExecuter::AbortState) { - RuntimeError( - Ydb::StatusIds::UNAVAILABLE, - NYql::TIssues({NYql::TIssue(TStringBuilder() - << "cannot retry query execution because the maximum retry delay has been reached")})); - } - + auto issues = NYql::TIssues({ + NYql::TIssue(TStringBuilder() << "Cannot retry query execution because the maximum retry delay has been reached"), + }); + AbortWithError(Ydb::StatusIds::UNAVAILABLE, std::move(issues)); return; } - auto newLimit = std::max(partInfo->LimitSize / 2, Settings.MinBatchSize); - partInfo->LimitSize = newLimit; - - auto ev = std::make_unique(partInfo->PartitionIndex); - Schedule(TDuration::MilliSeconds(partInfo->RetryDelayMs), ev.release()); - - // We use the init delay value first and change it for the next attempt auto decJitterDelay = RandomProvider->Uniform(Settings.StartRetryDelayMs, partInfo->RetryDelayMs * 3ul); auto newDelay = std::min(Settings.MaxRetryDelayMs, decJitterDelay); + auto oldLimit = partInfo->LimitSize; + auto oldDelay = partInfo->RetryDelayMs; partInfo->RetryDelayMs = newDelay; + partInfo->LimitSize = std::max(partInfo->LimitSize / 2, Settings.MinBatchSize); + + PE_LOG_N("Scheduling retry for partition " << partInfo->PartitionIndex + << ", delay: " << oldDelay << " -> " << partInfo->RetryDelayMs << " ms" + << ", batch size: " << oldLimit << " -> " << partInfo->LimitSize); + + auto ev = std::make_unique(partInfo->PartitionIndex); + Schedule(TDuration::MilliSeconds(partInfo->RetryDelayMs), ev.release()); } bool CheckExecutersAreFinished() const { @@ -702,15 +731,25 @@ class TKqpPartitionedExecuter : public TActorBootstrappedCurrentStateFunc() != &TKqpPartitionedExecuter::AbortState) { - ReturnStatus = code; - return Abort(); + if (CheckExecutersAreFinished()) { + PE_LOG_N("All executers have been finished, replying with error immediately"); + ReplyErrorAndDie(ReturnStatus, ReturnIssues); + } + return; } - ReplyErrorAndDie(code, issues); + PE_LOG_E("First error occurred: " << Ydb::StatusIds_StatusCode_Name(code) + << ", issues: " << issues.ToOneLineString()); + + ReturnStatus = code; + ReturnIssues.AddIssues(issues); + ReturnIssues.AddIssue(TStringBuilder() << "while executing BATCH UPDATE/DELETE by KqpPartitionedExecuterActor"); + Abort(); } void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, const NYql::TIssues& issues) { @@ -719,12 +758,6 @@ class TKqpPartitionedExecuter : public TActorBootstrapped issues; - IssueToMessage(issue, issues.Add()); - ReplyErrorAndDie(status, &issues); - } - void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, google::protobuf::RepeatedPtrField* issues) { @@ -737,6 +770,13 @@ class TKqpPartitionedExecuter : public TActorBootstrappedRecord.MutableResponse(); + response.SetStatus(Ydb::StatusIds::SUCCESS); + Send(SessionActorId, ResponseEv.release()); + PassAway(); + } + private: IKqpGateway::TExecPhysicalRequest Request; std::unique_ptr ResponseEv; @@ -783,8 +823,7 @@ class TKqpPartitionedExecuter : public TActorBootstrapped