diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index de42fe80ac5d..8712fc521edb 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -146,6 +146,58 @@ namespace { return std::nullopt; } } + + std::optional HandleAttachResult(NKikimr::NKqp::IKqpTransactionManagerPtr& txManager, NKikimr::TEvDataShard::TEvProposeTransactionAttachResult::TPtr& ev) { + const auto& record = ev->Get()->Record; + const ui64 shardId = record.GetTabletId(); + + const auto& reattachState = txManager->GetReattachState(shardId); + if (reattachState.Cookie != ev->Cookie) { + return std::nullopt; + } + + const auto shardState = txManager->GetState(shardId); + switch (shardState) { + case NKikimr::NKqp::IKqpTransactionManager::EXECUTING: + case NKikimr::NKqp::IKqpTransactionManager::PREPARED: + break; + case NKikimr::NKqp::IKqpTransactionManager::PREPARING: + case NKikimr::NKqp::IKqpTransactionManager::FINISHED: + case NKikimr::NKqp::IKqpTransactionManager::ERROR: + case NKikimr::NKqp::IKqpTransactionManager::PROCESSING: + YQL_ENSURE(false); + } + + if (record.GetStatus() == NKikimrProto::OK) { + // Transaction still exists at this shard + txManager->Reattached(shardId); + } + return record.GetStatus() == NKikimrProto::OK; + } + + bool HandleTransactionRestart(NKikimr::NKqp::IKqpTransactionManagerPtr& txManager, NKikimr::TEvDataShard::TEvProposeTransactionRestart::TPtr& ev) { + const auto& record = ev->Get()->Record; + const ui64 shardId = record.GetTabletId(); + + switch (txManager->GetState(shardId)) { + case NKikimr::NKqp::IKqpTransactionManager::PREPARED: + case NKikimr::NKqp::IKqpTransactionManager::EXECUTING: { + txManager->SetRestarting(shardId); + return true; + } + case NKikimr::NKqp::IKqpTransactionManager::PREPARING: { + return false; + } + case NKikimr::NKqp::IKqpTransactionManager::FINISHED: + case NKikimr::NKqp::IKqpTransactionManager::ERROR: { + return true; + } + case NKikimr::NKqp::IKqpTransactionManager::PROCESSING: { + YQL_ENSURE(false); + } + } + return true; + } } @@ -1227,33 +1279,12 @@ class TKqpTableWriteActor : public TActorBootstrapped { } void Handle(TEvDataShard::TEvProposeTransactionAttachResult::TPtr& ev) { - const auto& record = ev->Get()->Record; - const ui64 shardId = record.GetTabletId(); - - const auto& reattachState = TxManager->GetReattachState(shardId); - if (reattachState.Cookie != ev->Cookie) { + const auto result = HandleAttachResult(TxManager, ev); + if (!result) { return; } - - const auto shardState = TxManager->GetState(shardId); - switch (shardState) { - case IKqpTransactionManager::EXECUTING: - YQL_ENSURE(Mode == EMode::COMMIT || Mode == EMode::IMMEDIATE_COMMIT); - break; - case IKqpTransactionManager::PREPARED: - YQL_ENSURE(Mode == EMode::PREPARE); - break; - case IKqpTransactionManager::PREPARING: - case IKqpTransactionManager::FINISHED: - case IKqpTransactionManager::ERROR: - case IKqpTransactionManager::PROCESSING: - YQL_ENSURE(false); - } - - if (record.GetStatus() == NKikimrProto::OK) { - // Transaction still exists at this shard - CA_LOG_D("Reattached to shard " << shardId); - TxManager->Reattached(shardId); + if (*result) { + CA_LOG_D("Reattached to shard " << ev->Get()->Record.GetTabletId()); return; } @@ -1262,42 +1293,29 @@ class TKqpTableWriteActor : public TActorBootstrapped { NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() - << "ShardId=" << shardId - << " for table '" << TablePath - << "': attach transaction failed."); + << "Disconnected from shard " << ev->Get()->Record.GetTabletId() << "." + << "Table: `" << TablePath << "`."); return; } else { RuntimeError( NYql::NDqProto::StatusIds::UNDETERMINED, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, TStringBuilder() - << "ShardId=" << shardId - << " for table '" << TablePath - << "': attach transaction failed."); + << "Disconnected from shard " << ev->Get()->Record.GetTabletId() << "." + << "Table: `" << TablePath << "`."); return; } } void Handle(TEvDataShard::TEvProposeTransactionRestart::TPtr& ev) { - const auto& record = ev->Get()->Record; - const ui64 shardId = record.GetTabletId(); - - CA_LOG_D("Got transaction restart event from tabletId: " << shardId); - - switch (TxManager->GetState(shardId)) { - case IKqpTransactionManager::EXECUTING: { - TxManager->SetRestarting(shardId); - return; - } - case IKqpTransactionManager::FINISHED: - case IKqpTransactionManager::ERROR: { - return; - } - case IKqpTransactionManager::PREPARING: - case IKqpTransactionManager::PREPARED: - case IKqpTransactionManager::PROCESSING: { - YQL_ENSURE(false); - } + CA_LOG_D("Got transaction restart event from tabletId: " << ev->Get()->Record.GetTabletId()); + if (!HandleTransactionRestart(TxManager, ev)) { + RuntimeError( + NYql::NDqProto::StatusIds::UNAVAILABLE, + NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + TStringBuilder() + << "Disconnected from shard " << ev->Get()->Record.GetTabletId() << "." + << "Table: `" << TablePath << "`."); } } @@ -2101,6 +2119,19 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub using TBase = TActorBootstrapped; using TTopicTabletTxs = NTopic::TTopicOperationTransactions; + struct TEvPrivate { + enum EEv { + EvReattachToShard = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + }; + + struct TEvReattachToShard : public TEventLocal { + const ui64 TabletId; + + explicit TEvReattachToShard(ui64 tabletId) + : TabletId(tabletId) {} + }; + }; + public: TKqpBufferWriteActor( TKqpBufferWriterSettings&& settings) @@ -2175,9 +2206,13 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub hFunc(TEvKqpBuffer::TEvTerminate, Handle); hFunc(TEvPersQueue::TEvProposeTransactionResult, HandlePrepare); hFunc(NKikimr::NEvents::TDataEvents::TEvWriteResult, HandlePrepare); - hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + hFunc(TEvPipeCache::TEvDeliveryProblem, HandlePrepare); hFunc(TEvDataShard::TEvOverloadReady, HandlePrepare); hFunc(TEvColumnShard::TEvOverloadReady, HandlePrepare); + + hFunc(TEvDataShard::TEvProposeTransactionAttachResult, HandlePrepare); + hFunc(TEvPrivate::TEvReattachToShard, Handle); + hFunc(TEvDataShard::TEvProposeTransactionRestart, Handle); default: AFL_ENSURE(false)("unknown message", ev->GetTypeRewrite()); } @@ -2196,6 +2231,10 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub hFunc(TEvPersQueue::TEvProposeTransactionResult, HandleCommit); hFunc(NKikimr::NEvents::TDataEvents::TEvWriteResult, HandleCommit); hFunc(TEvPipeCache::TEvDeliveryProblem, HandleCommit); + + hFunc(TEvDataShard::TEvProposeTransactionAttachResult, HandleCommit); + hFunc(TEvPrivate::TEvReattachToShard, Handle); + hFunc(TEvDataShard::TEvProposeTransactionRestart, Handle); default: AFL_ENSURE(false)("unknown message", ev->GetTypeRewrite()); } @@ -3021,6 +3060,66 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub } } + void HandlePrepare(TEvDataShard::TEvProposeTransactionAttachResult::TPtr& ev) { + const auto result = HandleAttachResult(TxManager, ev); + if (!result) { + return; + } + if (*result) { + CA_LOG_D("Reattached to shard " << ev->Get()->Record.GetTabletId()); + return; + } + + ReplyErrorAndDie( + NYql::NDqProto::StatusIds::UNAVAILABLE, + NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + TStringBuilder() + << "Disconnected from shard " << ev->Get()->Record.GetTabletId() << "." + << GetPathes(ev->Get()->Record.GetTabletId()) << "."); + } + + void HandleCommit(TEvDataShard::TEvProposeTransactionAttachResult::TPtr& ev) { + const auto result = HandleAttachResult(TxManager, ev); + if (!result) { + return; + } + if (*result) { + CA_LOG_D("Reattached to shard " << ev->Get()->Record.GetTabletId()); + return; + } + + ReplyErrorAndDie( + NYql::NDqProto::StatusIds::UNDETERMINED, + NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, + TStringBuilder() + << "Disconnected from shard " << ev->Get()->Record.GetTabletId() << "." + << GetPathes(ev->Get()->Record.GetTabletId()) << "."); + } + + void Handle(TEvDataShard::TEvProposeTransactionRestart::TPtr& ev) { + CA_LOG_D("Got transaction restart event from tabletId: " << ev->Get()->Record.GetTabletId()); + if (!HandleTransactionRestart(TxManager, ev)) { + ReplyErrorAndDie( + NYql::NDqProto::StatusIds::UNAVAILABLE, + NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + TStringBuilder() + << "Disconnected from shard " << ev->Get()->Record.GetTabletId() << "." + << GetPathes(ev->Get()->Record.GetTabletId()) << "."); + } + } + + void Handle(TEvPrivate::TEvReattachToShard::TPtr& ev) { + const ui64 tabletId = ev->Get()->TabletId; + auto& state = TxManager->GetReattachState(tabletId); + + CA_LOG_D("Reattach to shard " << tabletId); + + YQL_ENSURE(TxId); + Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward( + new TEvDataShard::TEvProposeTransactionAttach(tabletId, *TxId), + tabletId, /* subscribe */ true), 0, ++state.Cookie); + } + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { CA_LOG_W("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId); ReplyErrorAndDie( @@ -3028,7 +3127,26 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() << "Kikimr cluster or one of its subsystems was unavailable. Failed to deviler message.", {}); - return; + } + + void HandlePrepare(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + CA_LOG_W("TEvDeliveryProblem was received from tablet: " << ev->Get()->TabletId); + + const auto state = TxManager->GetState(ev->Get()->TabletId); + if (state == IKqpTransactionManager::PREPARED && TxManager->ShouldReattach(ev->Get()->TabletId, TlsActivationContext->Now())) { + const auto& reattachState = TxManager->GetReattachState(ev->Get()->TabletId); + CA_LOG_N("Shard " << ev->Get()->TabletId << " delivery problem (reattaching in " + << reattachState.ReattachInfo.Delay << ")"); + + Schedule(reattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(ev->Get()->TabletId)); + return; + } + + ReplyErrorAndDie( + NYql::NDqProto::StatusIds::UNAVAILABLE, + NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + TStringBuilder() << "Kikimr cluster or one of its subsystems was unavailable. Failed to deviler message.", + {}); } void HandleCommit(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { @@ -3057,12 +3175,21 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub return; } + const auto state = TxManager->GetState(ev->Get()->TabletId); + if (state == IKqpTransactionManager::EXECUTING && TxManager->ShouldReattach(ev->Get()->TabletId, TlsActivationContext->Now())) { + const auto& reattachState = TxManager->GetReattachState(ev->Get()->TabletId); + CA_LOG_N("Shard " << ev->Get()->TabletId << " delivery problem (reattaching in " + << reattachState.ReattachInfo.Delay << ")"); + + Schedule(reattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(ev->Get()->TabletId)); + return; + } + ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNDETERMINED, NYql::TIssuesIds::KIKIMR_OPERATION_STATE_UNKNOWN, TStringBuilder() << "State of operation is unknown. Failed to deviler message.", {}); - return; } void Handle(TEvKqpBuffer::TEvTerminate::TPtr&) { @@ -3215,18 +3342,6 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub return issues; }; - auto getPathes = [&]() -> TString { - const auto tableInfo = TxManager->GetShardTableInfo(ev->Get()->Record.GetOrigin()); - TStringBuilder builder; - for (const auto& path : tableInfo.Pathes) { - if (!builder.empty()) { - builder << ", "; - } - builder << "`" << path << "`"; - } - return (tableInfo.Pathes.size() == 1 ? "Table: " : "Tables: ") + builder; - }; - switch (ev->Get()->GetStatus()) { case NKikimrDataEvents::TEvWriteResult::STATUS_UNSPECIFIED: { CA_LOG_E("Got UNSPECIFIED for tables." @@ -3237,7 +3352,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNSPECIFIED, NYql::TIssuesIds::DEFAULT_ERROR, - TStringBuilder() << "Unspecified error. " << getPathes() << ". " + TStringBuilder() << "Unspecified error. " << GetPathes(ev->Get()->Record.GetOrigin()) << ". " << getIssues().ToOneLineString(), getIssues()); return; @@ -3267,7 +3382,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, - TStringBuilder() << "Wrong shard state. " << getPathes() << ".", + TStringBuilder() << "Wrong shard state. " << GetPathes(ev->Get()->Record.GetOrigin()) << ".", getIssues()); return; } @@ -3293,7 +3408,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ReplyErrorAndDie( NYql::NDqProto::StatusIds::UNAVAILABLE, NYql::TIssuesIds::KIKIMR_DISK_SPACE_EXHAUSTED, - TStringBuilder() << "Disk space exhausted. " << getPathes() << ".", + TStringBuilder() << "Disk space exhausted. " << GetPathes(ev->Get()->Record.GetOrigin()) << ".", getIssues()); return; } @@ -3307,7 +3422,8 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ReplyErrorAndDie( NYql::NDqProto::StatusIds::OVERLOADED, NYql::TIssuesIds::KIKIMR_OVERLOADED, - TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is out of space. " << getPathes() << ".", + TStringBuilder() << "Tablet " << ev->Get()->Record.GetOrigin() << " is out of space. " + << GetPathes(ev->Get()->Record.GetOrigin()) << ".", getIssues()); return; } @@ -3323,7 +3439,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub NYql::TIssuesIds::KIKIMR_OVERLOADED, TStringBuilder() << "Kikimr cluster or one of its subsystems is overloaded." << " Tablet " << ev->Get()->Record.GetOrigin() << " is overloaded." - << " " << getPathes() << ".", + << " " << GetPathes(ev->Get()->Record.GetOrigin()) << ".", getIssues()); return; } @@ -3349,7 +3465,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ReplyErrorAndDie( NYql::NDqProto::StatusIds::BAD_REQUEST, NYql::TIssuesIds::KIKIMR_BAD_REQUEST, - TStringBuilder() << "Bad request. " << getPathes() << ".", + TStringBuilder() << "Bad request. " << GetPathes(ev->Get()->Record.GetOrigin()) << ".", getIssues()); return; } @@ -3362,7 +3478,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ReplyErrorAndDie( NYql::NDqProto::StatusIds::SCHEME_ERROR, NYql::TIssuesIds::KIKIMR_SCHEME_MISMATCH, - TStringBuilder() << "Scheme changed. " << getPathes() << ".", + TStringBuilder() << "Scheme changed. " << GetPathes(ev->Get()->Record.GetOrigin()) << ".", getIssues()); return; } @@ -3379,7 +3495,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub NYql::TIssuesIds::KIKIMR_LOCKS_INVALIDATED, TStringBuilder() << "Transaction locks invalidated. " - << getPathes() << ".", + << GetPathes(ev->Get()->Record.GetOrigin()) << ".", getIssues()); return; } @@ -3392,7 +3508,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ReplyErrorAndDie( NYql::NDqProto::StatusIds::PRECONDITION_FAILED, NYql::TIssuesIds::KIKIMR_CONSTRAINT_VIOLATION, - TStringBuilder() << "Constraint violated. " << getPathes() << ".", + TStringBuilder() << "Constraint violated. " << GetPathes(ev->Get()->Record.GetOrigin()) << ".", getIssues()); return; } @@ -3617,6 +3733,18 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub PassAway(); } + TString GetPathes(ui64 shardId) const { + const auto tableInfo = TxManager->GetShardTableInfo(shardId); + TStringBuilder builder; + for (const auto& path : tableInfo.Pathes) { + if (!builder.empty()) { + builder << ", "; + } + builder << "`" << path << "`"; + } + return (tableInfo.Pathes.size() == 1 ? "Table: " : "Tables: ") + builder; + } + NYql::NDqProto::TDqTaskStats BuildStats() { NYql::NDqProto::TDqTaskStats result; ForEachWriteActor([&](TKqpTableWriteActor* actor, const TActorId) {