diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 45d392081219..762814b4fbc0 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -706,7 +706,6 @@ class TKqpTableWriteActor : public TActorBootstrapped { return builder; }() << ", Cookie=" << ev->Cookie); - UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->AddParticipantNode(ev->Sender.NodeId()); @@ -735,6 +734,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNSPECIFIED, @@ -758,6 +758,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::ABORTED, @@ -776,6 +777,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); } else { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, @@ -791,6 +793,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::INTERNAL_ERROR, @@ -805,6 +808,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, @@ -823,6 +827,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << getIssues().ToOneLineString()); // TODO: support waiting if (!InconsistentTx) { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, @@ -842,6 +847,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << getIssues().ToOneLineString()); // TODO: support waiting if (!InconsistentTx) { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, @@ -859,6 +865,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::CANCELLED, @@ -873,6 +880,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::BAD_REQUEST, @@ -892,6 +900,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); } else { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::SCHEME_ERROR, @@ -909,6 +918,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->BreakLock(ev->Get()->Record.GetOrigin()); YQL_ENSURE(TxManager->BrokenLocks()); TxManager->SetError(ev->Get()->Record.GetOrigin()); @@ -925,6 +935,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::PRECONDITION_FAILED, @@ -942,6 +953,8 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto& record = ev->Get()->Record; AFL_ENSURE(record.GetTxLocks().empty()); + UpdateStats(record.GetTxStats()); + IKqpTransactionManager::TPrepareResult preparedInfo; preparedInfo.ShardId = record.GetOrigin(); preparedInfo.MinStep = record.GetMinStep(); @@ -979,6 +992,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { if (Mode == EMode::WRITE) { for (const auto& lock : ev->Get()->Record.GetTxLocks()) { if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) { + UpdateStats(ev->Get()->Record.GetTxStats()); YQL_ENSURE(TxManager->BrokenLocks()); NYql::TIssues issues; issues.AddIssue(*TxManager->GetLockIssue()); @@ -991,6 +1005,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { } if (Mode == EMode::COMMIT) { + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), 0); return; } @@ -999,9 +1014,11 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto result = ShardedWriteController->OnMessageAcknowledged( ev->Get()->Record.GetOrigin(), ev->Cookie); if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) { + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), result->DataSize); } else if (result) { AFL_ENSURE(Mode == EMode::WRITE); + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnMessageAcknowledged(result->DataSize); } } diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 40418bb28bd4..ec14b701b521 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1122,12 +1122,16 @@ Y_UNIT_TEST_SUITE(KqpCost) { } - Y_UNIT_TEST_TWIN(OltpWriteRow, isSink) { + Y_UNIT_TEST_QUAD(WriteRow, isSink, isOlap) { + if (isOlap) { + // TODO: same stats for olap? + return; + } TKikimrRunner kikimr(GetAppConfig(false, false, isSink)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); - CreateTestTable(session, false); + CreateTestTable(session, isOlap); { auto query = Q_(R"( @@ -1143,7 +1147,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - size_t phase = stats.query_phases_size() - 1; + size_t phase = isOlap ? 0 : stats.query_phases_size() - 1; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); @@ -1176,7 +1180,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - size_t phase = stats.query_phases_size() - 1; + size_t phase = isOlap ? 0 : stats.query_phases_size() - 1; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); @@ -1387,13 +1391,17 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } - Y_UNIT_TEST_TWIN(OltpWriteRowInsertFails, isSink) { + Y_UNIT_TEST_QUAD(WriteRowInsertFails, isSink, isOlap) { + if (isOlap) { + // TODO: same stats for olap? + return; + } TKikimrRunner kikimr(GetAppConfig(false, false, isSink)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); - CreateTestTable(session, false); - CreateTestTable(session, false, "2"); + CreateTestTable(session, isOlap); + CreateTestTable(session, isOlap, "2"); { // Three inserts @@ -1744,6 +1752,116 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } + Y_UNIT_TEST_TWIN(CTAS, isOlap) { + auto appConfig = GetAppConfig(false, false, true); + appConfig.MutableTableServiceConfig()->SetEnableDataShardCreateTableAs(true); + TKikimrRunner kikimr(appConfig); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, isOlap); + + { + auto query = std::format(R"( + CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`; + )", isOlap ? "COLUMN" : "ROW"); + + auto txControl = NYdb::NQuery::TTxControl::NoTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = 0; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1568 : 80); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); + + Check( + FromProto(stats), + TTotalStats{ + .Writes = 4, + .Reads = 4, + .Deletes = 0, + }); + } + } + + Y_UNIT_TEST_TWIN(CTASWithRetry, isOlap) { + auto appConfig = GetAppConfig(false, false, true); + appConfig.MutableTableServiceConfig()->SetEnableDataShardCreateTableAs(true); + appConfig.MutableTableServiceConfig()->MutableWriteActorSettings()->SetInFlightMemoryLimitPerActorBytes(40); + // For executing REPLACE + appConfig.MutableTableServiceConfig()->SetEnableStreamWrite(true); + TKikimrSettings settings(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetQueryClient(); + auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + + kikimr.RunCall([&] { + CreateTestTable(session, isOlap); + }); + + size_t messages = 0; + + auto grab = [&](TAutoPtr &ev) -> auto { + if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) { + ++messages; + auto* msg = ev->Get(); + for (size_t index = 0; index < 3; ++index) { + // Send several duplicates + auto copy = std::make_unique(); + copy->Record = msg->Record; + runtime.Send(new IEventHandle(ev->Recipient, ev->Sender, copy.release(), ev->Flags, ev->Cookie)); + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + runtime.SetObserverFunc(grab); + + { + auto query = std::format(R"( + CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`; + )", isOlap ? "COLUMN" : "ROW"); + + auto txControl = NYdb::NQuery::TTxControl::NoTx(); + + auto result = kikimr.RunCall([&] { return session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); }); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = 0; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1568 : 80); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); + + Check( + FromProto(stats), + TTotalStats{ + .Writes = 4, + .Reads = 4, + .Deletes = 0, + }); + } + + UNIT_ASSERT_EQUAL(messages, isOlap ? 4 : 1); + } + } }