From 7647014ac6b489e35a83f13e10e4ee8ad1ca0ad4 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 20 Nov 2025 15:53:28 +0300 Subject: [PATCH 1/4] tests --- ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 61 ++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 2da9d6bc0cb7..f3cba5695b46 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1122,12 +1122,16 @@ Y_UNIT_TEST_SUITE(KqpCost) { } - Y_UNIT_TEST_TWIN(OltpWriteRow, isSink) { + Y_UNIT_TEST_QUAD(WriteRow, isSink, isOlap) { + if (isOlap) { + // TODO: same stats for olap? + return; + } TKikimrRunner kikimr(GetAppConfig(false, false, isSink)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); - CreateTestTable(session, false); + CreateTestTable(session, isOlap); { auto query = Q_(R"( @@ -1143,7 +1147,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - size_t phase = stats.query_phases_size() - 1; + size_t phase = isOlap ? 0 : stats.query_phases_size() - 1; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); @@ -1176,7 +1180,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - size_t phase = stats.query_phases_size() - 1; + size_t phase = isOlap ? 0 : stats.query_phases_size() - 1; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); @@ -1388,13 +1392,17 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } - Y_UNIT_TEST_TWIN(OltpWriteRowInsertFails, isSink) { + Y_UNIT_TEST_QUAD(WriteRowInsertFails, isSink, isOlap) { + if (isOlap) { + // TODO: same stats for olap? + return; + } TKikimrRunner kikimr(GetAppConfig(false, false, isSink)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); - CreateTestTable(session, false); - CreateTestTable(session, false, "2"); + CreateTestTable(session, isOlap); + CreateTestTable(session, isOlap, "2"); { // Three inserts @@ -1745,6 +1753,45 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } + Y_UNIT_TEST_TWIN(CTAS, isOlap) { + TKikimrRunner kikimr(GetAppConfig(false, false, true)); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, isOlap); + + { + auto query = std::format(R"( + CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`; + )", isOlap ? "COLUMN" : "ROW"); + + auto txControl = NYdb::NQuery::TTxControl::NoTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = 0; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); + + Check( + FromProto(stats), + TTotalStats{ + .Writes = 4, + .Reads = 4, + .Deletes = 0, + }); + } + } + } } From aa0b3f648f819df8fb555d98ef51cb185d5b007a Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 20 Nov 2025 17:46:41 +0300 Subject: [PATCH 2/4] test --- ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 62 +++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index f3cba5695b46..75feb5da25e8 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1753,7 +1753,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } - Y_UNIT_TEST_TWIN(CTAS, isOlap) { + Y_UNIT_TEST_TWIN(CTAS, isOlap) { TKikimrRunner kikimr(GetAppConfig(false, false, true)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); @@ -1771,6 +1771,66 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = 0; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); + + Check( + FromProto(stats), + TTotalStats{ + .Writes = 4, + .Reads = 4, + .Deletes = 0, + }); + } + } + + Y_UNIT_TEST_TWIN(CTASWithRetry, isOlap) { + auto appConfig = GetAppConfig(false, false, true); + appConfig.MutableTableServiceConfig()->MutableWriteActorSettings()->SetInFlightMemoryLimitPerActorBytes(40); + // For executing REPLACE + appConfig.MutableTableServiceConfig()->SetEnableStreamWrite(true); + TKikimrSettings settings(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetQueryClient(); + auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + + kikimr.RunCall([&] { + CreateTestTable(session, isOlap); + }); + + auto grab = [&](TAutoPtr &ev) -> auto { + if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) { + auto* msg = ev->Get(); + auto copy = std::make_unique(); + copy->Record = msg->Record; + runtime.Send(new IEventHandle(ev->Recipient, ev->Sender, copy.release(), ev->Flags, ev->Cookie)); + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + runtime.SetObserverFunc(grab); + + { + auto query = std::format(R"( + CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`; + )", isOlap ? "COLUMN" : "ROW"); + + auto txControl = NYdb::NQuery::TTxControl::NoTx(); + + auto result = kikimr.RunCall([&] { return session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); }); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; From 42b45f73a6ac21fcc24e6154c91354b5b574c876 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 20 Nov 2025 17:55:17 +0300 Subject: [PATCH 3/4] fix --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index b7715046c96c..fe9143a0adf7 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -714,7 +714,6 @@ class TKqpTableWriteActor : public TActorBootstrapped { return builder; }() << ", Cookie=" << ev->Cookie); - UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->AddParticipantNode(ev->Sender.NodeId()); @@ -743,6 +742,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNSPECIFIED, @@ -766,6 +766,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::ABORTED, @@ -785,6 +786,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); } else { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, @@ -800,6 +802,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::INTERNAL_ERROR, @@ -814,6 +817,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, @@ -832,6 +836,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << getIssues().ToOneLineString()); // TODO: support waiting if (!InconsistentTx) { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, @@ -851,6 +856,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << getIssues().ToOneLineString()); // TODO: support waiting if (!InconsistentTx) { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, @@ -868,6 +874,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::CANCELLED, @@ -882,6 +889,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::BAD_REQUEST, @@ -901,6 +909,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); } else { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::SCHEME_ERROR, @@ -918,6 +927,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->BreakLock(ev->Get()->Record.GetOrigin()); YQL_ENSURE(TxManager->BrokenLocks()); TxManager->SetError(ev->Get()->Record.GetOrigin()); @@ -934,6 +944,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::PRECONDITION_FAILED, @@ -951,6 +962,8 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto& record = ev->Get()->Record; AFL_ENSURE(record.GetTxLocks().empty()); + UpdateStats(record.GetTxStats()); + IKqpTransactionManager::TPrepareResult preparedInfo; preparedInfo.ShardId = record.GetOrigin(); preparedInfo.MinStep = record.GetMinStep(); @@ -988,6 +1001,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { if (Mode == EMode::WRITE) { for (const auto& lock : ev->Get()->Record.GetTxLocks()) { if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) { + UpdateStats(ev->Get()->Record.GetTxStats()); YQL_ENSURE(TxManager->BrokenLocks()); NYql::TIssues issues; issues.AddIssue(*TxManager->GetLockIssue()); @@ -1000,6 +1014,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { } if (Mode == EMode::COMMIT) { + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), 0); return; } @@ -1008,9 +1023,11 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto result = ShardedWriteController->OnMessageAcknowledged( ev->Get()->Record.GetOrigin(), ev->Cookie); if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) { + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), result->DataSize); } else if (result) { AFL_ENSURE(Mode == EMode::WRITE); + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnMessageAcknowledged(result->DataSize); } } From 83f06bcc404d1650d27544471a212ac0c7dcb80d Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 20 Nov 2025 18:14:34 +0300 Subject: [PATCH 4/4] fix --- ydb/core/kqp/ut/cost/kqp_cost_ut.cpp | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 75feb5da25e8..cff110ef477d 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1809,12 +1809,18 @@ Y_UNIT_TEST_SUITE(KqpCost) { CreateTestTable(session, isOlap); }); + size_t messages = 0; + auto grab = [&](TAutoPtr &ev) -> auto { if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) { + ++messages; auto* msg = ev->Get(); - auto copy = std::make_unique(); - copy->Record = msg->Record; - runtime.Send(new IEventHandle(ev->Recipient, ev->Sender, copy.release(), ev->Flags, ev->Cookie)); + for (size_t index = 0; index < 3; ++index) { + // Send several duplicates + auto copy = std::make_unique(); + copy->Record = msg->Record; + runtime.Send(new IEventHandle(ev->Recipient, ev->Sender, copy.release(), ev->Flags, ev->Cookie)); + } } return TTestActorRuntime::EEventAction::PROCESS; }; @@ -1850,6 +1856,8 @@ Y_UNIT_TEST_SUITE(KqpCost) { .Deletes = 0, }); } + + UNIT_ASSERT_EQUAL(messages, isOlap ? 4 : 1); } }