From a994b69c7ad93cf22f0739fa8d7b6be9af60ea10 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 30 Oct 2025 17:48:41 +0300 Subject: [PATCH] Fix overloaded EvWrite Prepare (#27964) --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 2 + ydb/core/kqp/ut/effects/kqp_overload_ut.cpp | 72 ++++++++++++++------- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 3167135cd68f..532dbf3f5abb 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -665,6 +665,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto metadata = ShardedWriteController->GetMessageMetadata(shardId); if (metadata && seqNo + 1 == metadata->NextOverloadSeqNo) { CA_LOG_D("Retry Overloaded ShardID=" << shardId); + ResetShardRetries(shardId, metadata->Cookie); SendDataToShard(shardId); } } @@ -1074,6 +1075,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto metadata = ShardedWriteController->GetMessageMetadata(shardId); YQL_ENSURE(metadata); + YQL_ENSURE(metadata->SendAttempts == 0 || InconsistentTx); if (metadata->SendAttempts >= MessageSettings.MaxWriteAttempts) { CA_LOG_W("ShardId=" << shardId << " for table '" << TablePath diff --git a/ydb/core/kqp/ut/effects/kqp_overload_ut.cpp b/ydb/core/kqp/ut/effects/kqp_overload_ut.cpp index 5330bf3bc406..eb3bd53b4f0f 100644 --- a/ydb/core/kqp/ut/effects/kqp_overload_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_overload_ut.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include @@ -15,7 +16,7 @@ using namespace NYdb::NQuery; Y_UNIT_TEST_SUITE(KqpOverload) { - Y_UNIT_TEST(OltpOverloaded) { + Y_UNIT_TEST_TWIN(OltpOverloaded, Distributed) { TKikimrSettings settings; settings.SetUseRealThreads(false); settings.AppConfig.MutableTableServiceConfig()->SetEnableOltpSink(true); @@ -27,50 +28,67 @@ Y_UNIT_TEST_SUITE(KqpOverload) { auto& runtime = *kikimr.GetTestServer().GetRuntime(); Y_UNUSED(runtime); + auto edgeActor = runtime.AllocateEdgeActor(); + + const auto& shards = GetTableShards( + &kikimr.GetTestServer(), + edgeActor, + Distributed ? "/Root/TwoShard" : "/Root/KeyValue"); + UNIT_ASSERT(Distributed ? shards.size() == 2 : shards.size() == 1); + const auto overloadedShard = shards[0]; + const auto overloadedShardActor = ResolveTablet(runtime, overloadedShard); + { - const TString query(Q1_(R"( - UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, 'value'); - SELECT * FROM `/Root/KeyValue`; - )")); + const TString query = + Distributed + ? R"( + UPSERT INTO `/Root/TwoShard` (Key, Value1) VALUES (1, 'value'); + SELECT * FROM `/Root/TwoShard`; + )" + : R"( + UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES (1, 'value'); + SELECT * FROM `/Root/KeyValue`; + )"; std::vector> requests; std::vector> responses; bool blockResults = true; - size_t overloadSeqNo = 0; + size_t overloadSeqNo = 0; auto grab = [&](TAutoPtr &ev) -> auto { if (blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) { auto* msg = ev->Get(); + if (msg->Record.GetOrigin() == overloadedShard) { + auto overloadedResult = NEvents::TDataEvents::TEvWriteResult::BuildError( + msg->Record.GetOrigin(), + msg->Record.GetTxId(), + NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, + ""); - auto overloadedResult = NEvents::TDataEvents::TEvWriteResult::BuildError( - msg->Record.GetOrigin(), - msg->Record.GetTxId(), - NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, - ""); - - UNIT_ASSERT(overloadSeqNo > 0); - overloadedResult->Record.SetOverloadSubscribed(overloadSeqNo); + UNIT_ASSERT(overloadSeqNo > 0); + overloadedResult->Record.SetOverloadSubscribed(overloadSeqNo); - runtime.Send(ev->Recipient, ev->Sender, overloadedResult.release()); + runtime.Send(ev->Recipient, ev->Sender, overloadedResult.release()); - auto overloadedReady = std::make_unique(msg->Record.GetOrigin(), overloadSeqNo); + auto overloadedReady = std::make_unique(msg->Record.GetOrigin(), overloadSeqNo); - runtime.Send(ev->Recipient, ev->Sender, overloadedReady.release()); + runtime.Send(ev->Recipient, ev->Sender, overloadedReady.release()); - responses.emplace_back(ev.Release()); + responses.emplace_back(ev.Release()); - blockResults = false; + blockResults = false; - return TTestActorRuntime::EEventAction::DROP; - } else if (!blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) { + return TTestActorRuntime::EEventAction::DROP; + } + } else if (!blockResults && ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType && ev->GetRecipientRewrite() == overloadedShardActor) { for(auto& ev : responses) { runtime.Send(ev.release()); } responses.clear(); requests.emplace_back(ev.Release()); return TTestActorRuntime::EEventAction::DROP; - } else if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType) { + } else if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWrite::EventType && ev->GetRecipientRewrite() == overloadedShardActor) { auto* msg = ev->Get(); overloadSeqNo = msg->Record.GetOverloadSubscribe(); } @@ -93,14 +111,16 @@ Y_UNIT_TEST_SUITE(KqpOverload) { return requests.size() >= requestsExpected; }); runtime.DispatchEvents(opts); - AFL_ENSURE(requests.size() == requestsExpected); + UNIT_ASSERT(requests.size() == requestsExpected); + UNIT_ASSERT(!blockResults); + UNIT_ASSERT(overloadSeqNo > 0); auto result = runtime.WaitFuture(future); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); auto tx = result.GetTransaction(); UNIT_ASSERT(tx); - + overloadSeqNo = 0; blockResults = true; ++requestsExpected; @@ -109,7 +129,9 @@ Y_UNIT_TEST_SUITE(KqpOverload) { }); runtime.DispatchEvents(opts); - AFL_ENSURE(requests.size() == requestsExpected); + UNIT_ASSERT(requests.size() == requestsExpected); + UNIT_ASSERT(!blockResults); + UNIT_ASSERT(overloadSeqNo > 0); auto commitResult = runtime.WaitFuture(commitFuture); UNIT_ASSERT_C(commitResult.IsSuccess(), commitResult.GetIssues().ToString());