Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,6 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
return builder;
}()
<< ", Cookie=" << ev->Cookie);
UpdateStats(ev->Get()->Record.GetTxStats());

TxManager->AddParticipantNode(ev->Sender.NodeId());

Expand Down Expand Up @@ -735,6 +734,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::UNSPECIFIED,
Expand All @@ -758,6 +758,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::ABORTED,
Expand All @@ -776,6 +777,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie);
RetryResolve();
} else {
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::UNAVAILABLE,
Expand All @@ -791,6 +793,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::INTERNAL_ERROR,
Expand All @@ -805,6 +808,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::UNAVAILABLE,
Expand All @@ -823,6 +827,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< getIssues().ToOneLineString());
// TODO: support waiting
if (!InconsistentTx) {
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::OVERLOADED,
Expand All @@ -842,6 +847,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< getIssues().ToOneLineString());
// TODO: support waiting
if (!InconsistentTx) {
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::OVERLOADED,
Expand All @@ -859,6 +865,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::CANCELLED,
Expand All @@ -873,6 +880,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::BAD_REQUEST,
Expand All @@ -892,6 +900,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
ResetShardRetries(ev->Get()->Record.GetOrigin(), ev->Cookie);
RetryResolve();
} else {
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::SCHEME_ERROR,
Expand All @@ -909,6 +918,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());

UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->BreakLock(ev->Get()->Record.GetOrigin());
YQL_ENSURE(TxManager->BrokenLocks());
TxManager->SetError(ev->Get()->Record.GetOrigin());
Expand All @@ -925,6 +935,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
<< " ShardID=" << ev->Get()->Record.GetOrigin() << ","
<< " Sink=" << this->SelfId() << "."
<< getIssues().ToOneLineString());
UpdateStats(ev->Get()->Record.GetTxStats());
TxManager->SetError(ev->Get()->Record.GetOrigin());
RuntimeError(
NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
Expand All @@ -942,6 +953,8 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
const auto& record = ev->Get()->Record;
AFL_ENSURE(record.GetTxLocks().empty());

UpdateStats(record.GetTxStats());

IKqpTransactionManager::TPrepareResult preparedInfo;
preparedInfo.ShardId = record.GetOrigin();
preparedInfo.MinStep = record.GetMinStep();
Expand Down Expand Up @@ -979,6 +992,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
if (Mode == EMode::WRITE) {
for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
if (!TxManager->AddLock(ev->Get()->Record.GetOrigin(), lock)) {
UpdateStats(ev->Get()->Record.GetTxStats());
YQL_ENSURE(TxManager->BrokenLocks());
NYql::TIssues issues;
issues.AddIssue(*TxManager->GetLockIssue());
Expand All @@ -991,6 +1005,7 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
}

if (Mode == EMode::COMMIT) {
UpdateStats(ev->Get()->Record.GetTxStats());
Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), 0);
return;
}
Expand All @@ -999,9 +1014,11 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
const auto result = ShardedWriteController->OnMessageAcknowledged(
ev->Get()->Record.GetOrigin(), ev->Cookie);
if (result && result->IsShardEmpty && Mode == EMode::IMMEDIATE_COMMIT) {
UpdateStats(ev->Get()->Record.GetTxStats());
Callbacks->OnCommitted(ev->Get()->Record.GetOrigin(), result->DataSize);
} else if (result) {
AFL_ENSURE(Mode == EMode::WRITE);
UpdateStats(ev->Get()->Record.GetTxStats());
Callbacks->OnMessageAcknowledged(result->DataSize);
}
}
Expand Down
132 changes: 125 additions & 7 deletions ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1122,12 +1122,16 @@ Y_UNIT_TEST_SUITE(KqpCost) {
}


Y_UNIT_TEST_TWIN(OltpWriteRow, isSink) {
Y_UNIT_TEST_QUAD(WriteRow, isSink, isOlap) {
if (isOlap) {
// TODO: same stats for olap?
return;
}
TKikimrRunner kikimr(GetAppConfig(false, false, isSink));
auto db = kikimr.GetQueryClient();
auto session = db.GetSession().GetValueSync().GetSession();

CreateTestTable(session, false);
CreateTestTable(session, isOlap);

{
auto query = Q_(R"(
Expand All @@ -1143,7 +1147,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

Cerr << stats.DebugString() << Endl;
size_t phase = stats.query_phases_size() - 1;
size_t phase = isOlap ? 0 : stats.query_phases_size() - 1;
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20);

Expand Down Expand Up @@ -1176,7 +1180,7 @@ Y_UNIT_TEST_SUITE(KqpCost) {
auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

Cerr << stats.DebugString() << Endl;
size_t phase = stats.query_phases_size() - 1;
size_t phase = isOlap ? 0 : stats.query_phases_size() - 1;
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 1);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), 20);

Expand Down Expand Up @@ -1387,13 +1391,17 @@ Y_UNIT_TEST_SUITE(KqpCost) {
}
}

Y_UNIT_TEST_TWIN(OltpWriteRowInsertFails, isSink) {
Y_UNIT_TEST_QUAD(WriteRowInsertFails, isSink, isOlap) {
if (isOlap) {
// TODO: same stats for olap?
return;
}
TKikimrRunner kikimr(GetAppConfig(false, false, isSink));
auto db = kikimr.GetQueryClient();
auto session = db.GetSession().GetValueSync().GetSession();

CreateTestTable(session, false);
CreateTestTable(session, false, "2");
CreateTestTable(session, isOlap);
CreateTestTable(session, isOlap, "2");

{
// Three inserts
Expand Down Expand Up @@ -1744,6 +1752,116 @@ Y_UNIT_TEST_SUITE(KqpCost) {
}
}

Y_UNIT_TEST_TWIN(CTAS, isOlap) {
auto appConfig = GetAppConfig(false, false, true);
appConfig.MutableTableServiceConfig()->SetEnableDataShardCreateTableAs(true);
TKikimrRunner kikimr(appConfig);
auto db = kikimr.GetQueryClient();
auto session = db.GetSession().GetValueSync().GetSession();

CreateTestTable(session, isOlap);

{
auto query = std::format(R"(
CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`;
)", isOlap ? "COLUMN" : "ROW");

auto txControl = NYdb::NQuery::TTxControl::NoTx();

auto result = session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());


auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

Cerr << stats.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1);
size_t phase = 0;
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1568 : 80);

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80);

Check(
FromProto(stats),
TTotalStats{
.Writes = 4,
.Reads = 4,
.Deletes = 0,
});
}
}

Y_UNIT_TEST_TWIN(CTASWithRetry, isOlap) {
auto appConfig = GetAppConfig(false, false, true);
appConfig.MutableTableServiceConfig()->SetEnableDataShardCreateTableAs(true);
appConfig.MutableTableServiceConfig()->MutableWriteActorSettings()->SetInFlightMemoryLimitPerActorBytes(40);
// For executing REPLACE
appConfig.MutableTableServiceConfig()->SetEnableStreamWrite(true);
TKikimrSettings settings(appConfig);
settings.SetUseRealThreads(false);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetQueryClient();
auto session = kikimr.RunCall([&] { return db.GetSession().GetValueSync().GetSession(); });

auto& runtime = *kikimr.GetTestServer().GetRuntime();

kikimr.RunCall([&] {
CreateTestTable(session, isOlap);
});

size_t messages = 0;

auto grab = [&](TAutoPtr<IEventHandle> &ev) -> auto {
if (ev->GetTypeRewrite() == NEvents::TDataEvents::TEvWriteResult::EventType) {
++messages;
auto* msg = ev->Get<NEvents::TDataEvents::TEvWriteResult>();
for (size_t index = 0; index < 3; ++index) {
// Send several duplicates
auto copy = std::make_unique<NEvents::TDataEvents::TEvWriteResult>();
copy->Record = msg->Record;
runtime.Send(new IEventHandle(ev->Recipient, ev->Sender, copy.release(), ev->Flags, ev->Cookie));
}
}
return TTestActorRuntime::EEventAction::PROCESS;
};
runtime.SetObserverFunc(grab);

{
auto query = std::format(R"(
CREATE TABLE `/Root/TestTable2` (PRIMARY KEY (Group, Name)) WITH (STORE={}) AS SELECT * FROM `/Root/TestTable`;
)", isOlap ? "COLUMN" : "ROW");

auto txControl = NYdb::NQuery::TTxControl::NoTx();

auto result = kikimr.RunCall([&] { return session.ExecuteQuery(query, txControl, GetQuerySettings()).ExtractValueSync(); });
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());


auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

Cerr << stats.DebugString() << Endl;
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases_size(), 1);
size_t phase = 0;
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().rows(), 4);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).updates().bytes(), isOlap ? 1568 : 80);

UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().rows(), 4);
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(1).reads().bytes(), isOlap ? 144 : 80);

Check(
FromProto(stats),
TTotalStats{
.Writes = 4,
.Reads = 4,
.Deletes = 0,
});
}

UNIT_ASSERT_EQUAL(messages, isOlap ? 4 : 1);
}

}

}
Expand Down
Loading