diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index b7715046c96c..fe9143a0adf7 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -714,7 +714,6 @@ class TKqpTableWriteActor : public TActorBootstrapped { return builder; }() << ", Cookie=" << ev->Cookie); - UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->AddParticipantNode(ev->Sender.NodeId()); @@ -743,6 +742,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNSPECIFIED, @@ -766,6 +766,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::ABORTED, @@ -785,6 +786,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); } else { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, @@ -800,6 +802,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::INTERNAL_ERROR, @@ -814,6 +817,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::UNAVAILABLE, @@ -832,6 +836,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << getIssues().ToOneLineString()); // TODO: support waiting if (!InconsistentTx) { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, @@ -851,6 +856,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << getIssues().ToOneLineString()); // TODO: support waiting if (!InconsistentTx) { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::OVERLOADED, @@ -868,6 +874,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::CANCELLED, @@ -882,6 +889,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::BAD_REQUEST, @@ -901,6 +909,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie); RetryResolve(); } else { + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::SCHEME_ERROR, @@ -918,6 +927,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->BreakLock(ev->Get()->Record.GetOrigin()); YQL_ENSURE(TxManager->BrokenLocks()); TxManager->SetError(ev->Get()->Record.GetOrigin()); @@ -934,6 +944,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { << " ShardID=" << ev->Get()->Record.GetOrigin() << "," << " Sink=" << this->SelfId() << "." << getIssues().ToOneLineString()); + UpdateStats(ev->Get()->Record.GetTxStats()); TxManager->SetError(ev->Get()->Record.GetOrigin()); RuntimeError( NYql::NDqProto::StatusIds::PRECONDITION_FAILED, @@ -951,6 +962,8 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto& record = ev->Get()->Record; AFL_ENSURE(record.GetTxLocks().empty()); + UpdateStats(record.GetTxStats()); + IKqpTransactionManager::TPrepareResult preparedInfo; preparedInfo.ShardId = record.GetOrigin(); preparedInfo.MinStep = record.GetMinStep(); @@ -988,6 +1001,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { if (Mode == EMode::WRITE) { for (const auto& lock : ev->Get()->Record.GetTxLocks()) { if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) { + UpdateStats(ev->Get()->Record.GetTxStats()); YQL_ENSURE(TxManager->BrokenLocks()); NYql::TIssues issues; issues.AddIssue(*TxManager->GetLockIssue()); @@ -1000,6 +1014,7 @@ class TKqpTableWriteActor : public TActorBootstrapped { } if (Mode == EMode::COMMIT) { + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), 0); return; } @@ -1008,9 +1023,11 @@ class TKqpTableWriteActor : public TActorBootstrapped { const auto result = ShardedWriteController->OnMessageAcknowledged( ev->Get()->Record.GetOrigin(), ev->Cookie); if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) { + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), result->DataSize); } else if (result) { AFL_ENSURE(Mode == EMode::WRITE); + UpdateStats(ev->Get()->Record.GetTxStats()); Callbacks->OnMessageAcknowledged(result->DataSize); } } diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 2da9d6bc0cb7..cff110ef477d 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -1122,12 +1122,16 @@ Y_UNIT_TEST_SUITE(KqpCost) { } - Y_UNIT_TEST_TWIN(OltpWriteRow, isSink) { + Y_UNIT_TEST_QUAD(WriteRow, isSink, isOlap) { + if (isOlap) { + // TODO: same stats for olap? + return; + } TKikimrRunner kikimr(GetAppConfig(false, false, isSink)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); - CreateTestTable(session, false); + CreateTestTable(session, isOlap); { auto query = Q_(R"( @@ -1143,7 +1147,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - size_t phase = stats.query_phases_size() - 1; + size_t phase = isOlap ? 0 : stats.query_phases_size() - 1; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); @@ -1176,7 +1180,7 @@ Y_UNIT_TEST_SUITE(KqpCost) { auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); Cerr << stats.DebugString() << Endl; - size_t phase = stats.query_phases_size() - 1; + size_t phase = isOlap ? 0 : stats.query_phases_size() - 1; UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1); UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20); @@ -1388,13 +1392,17 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } - Y_UNIT_TEST_TWIN(OltpWriteRowInsertFails, isSink) { + Y_UNIT_TEST_QUAD(WriteRowInsertFails, isSink, isOlap) { + if (isOlap) { + // TODO: same stats for olap? + return; + } TKikimrRunner kikimr(GetAppConfig(false, false, isSink)); auto db = kikimr.GetQueryClient(); auto session = db.GetSession().GetValueSync().GetSession(); - CreateTestTable(session, false); - CreateTestTable(session, false, "2"); + CreateTestTable(session, isOlap); + CreateTestTable(session, isOlap, "2"); { // Three inserts @@ -1745,6 +1753,113 @@ Y_UNIT_TEST_SUITE(KqpCost) { } } + Y_UNIT_TEST_TWIN(CTAS, isOlap) { + TKikimrRunner kikimr(GetAppConfig(false, false, true)); + auto db = kikimr.GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + CreateTestTable(session, isOlap); + + { + auto query = std::format(R"( + CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`; + )", isOlap ? "COLUMN" : "ROW"); + + auto txControl = NYdb::NQuery::TTxControl::NoTx(); + + auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = 0; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); + + Check( + FromProto(stats), + TTotalStats{ + .Writes = 4, + .Reads = 4, + .Deletes = 0, + }); + } + } + + Y_UNIT_TEST_TWIN(CTASWithRetry, isOlap) { + auto appConfig = GetAppConfig(false, false, true); + appConfig.MutableTableServiceConfig()->MutableWriteActorSettings()->SetInFlightMemoryLimitPerActorBytes(40); + // For executing REPLACE + appConfig.MutableTableServiceConfig()->SetEnableStreamWrite(true); + TKikimrSettings settings(appConfig); + settings.SetUseRealThreads(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetQueryClient(); + auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); }); + + auto& runtime = *kikimr.GetTestServer().GetRuntime(); + + kikimr.RunCall([&] { + CreateTestTable(session, isOlap); + }); + + size_t messages = 0; + + auto grab = [&](TAutoPtr &ev) -> auto { + if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) { + ++messages; + auto* msg = ev->Get(); + for (size_t index = 0; index < 3; ++index) { + // Send several duplicates + auto copy = std::make_unique(); + copy->Record = msg->Record; + runtime.Send(new IEventHandle(ev->Recipient, ev->Sender, copy.release(), ev->Flags, ev->Cookie)); + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + runtime.SetObserverFunc(grab); + + { + auto query = std::format(R"( + CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`; + )", isOlap ? "COLUMN" : "ROW"); + + auto txControl = NYdb::NQuery::TTxControl::NoTx(); + + auto result = kikimr.RunCall([&] { return session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); }); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + Cerr << stats.DebugString() << Endl; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1); + size_t phase = 0; + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1472 : 80); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80); + + Check( + FromProto(stats), + TTotalStats{ + .Writes = 4, + .Reads = 4, + .Deletes = 0, + }); + } + + UNIT_ASSERT_EQUAL(messages, isOlap ? 4 : 1); + } + } }