diff --git a/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp b/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp index 2278dd636d70..2a28139e348e 100644 --- a/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp +++ b/ydb/core/kqp/ut/tx/kqp_snapshot_isolation_ut.cpp @@ -66,6 +66,11 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) { } class TConflictWrite : public TTableDataModificationTester { + std::string WriteOperation = "insert"; + + public: + TConflictWrite(const std::string& writeOperation) : WriteOperation(writeOperation) {} + protected: void DoExecute() override { auto client = Kikimr->GetQueryClient(); @@ -76,7 +81,6 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) { SELECT * FROM `/Root/KV`; )"), TTxControl::BeginTx(TTxSettings::SnapshotRW())).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - auto tx1 = result.GetTransaction(); UNIT_ASSERT(tx1); @@ -86,12 +90,39 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) { )"), TTxControl::BeginTx(TTxSettings::SnapshotRW()).CommitTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - result = session1.ExecuteQuery(Q_(R"( - UPSERT INTO `/Root/Test` (Group, Name, Comment) - VALUES (1U, "Paul", "Changed"); - )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); - // Keys changed since taking snapshot. - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + if (WriteOperation == "insert") { + result = session1.ExecuteQuery(Q_(R"( + INSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed"); + )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); + } else if (WriteOperation == "upsert_partial") { + result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed"); + )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); + } else if (WriteOperation == "upsert_full") { + result = session1.ExecuteQuery(Q_(R"( + UPSERT INTO `/Root/Test` (Group, Name, Comment, Amount) + VALUES (1U, "Paul", "Changed", 301ul); + )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); + } else if (WriteOperation == "replace") { + result = session1.ExecuteQuery(Q_(R"( + REPLACE INTO `/Root/Test` (Group, Name, Comment) + VALUES (1U, "Paul", "Changed"); + )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); + } else if (WriteOperation == "delete") { + result = session1.ExecuteQuery(Q_(R"( + DELETE FROM `/Root/Test` WHERE Name == "Paul"; + )"), TTxControl::Tx(*tx1).CommitTx()).ExtractValueSync(); + } else { + UNIT_ASSERT(false); + } + + if (WriteOperation == "insert") { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + } else { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::ABORTED, result.GetIssues().ToString()); + } result = session2.ExecuteQuery(Q_(R"( SELECT * FROM `/Root/Test` WHERE Name == "Paul" ORDER BY Group, Name; @@ -103,22 +134,45 @@ Y_UNIT_TEST_SUITE(KqpSnapshotIsolation) { Y_UNIT_TEST(TConflictWriteOltp) { return; - TConflictWrite tester; + TConflictWrite tester("upsert_partial"); tester.SetIsOlap(false); tester.Execute(); } Y_UNIT_TEST(TConflictWriteOltpNoSink) { return; - TConflictWrite tester; + TConflictWrite tester("upsert_partial"); tester.SetIsOlap(false); tester.SetDisableSinks(true); tester.Execute(); } - Y_UNIT_TEST(TConflictWriteOlap) { - return; - TConflictWrite tester; + Y_UNIT_TEST(TConflictWriteOlapInsert) { + TConflictWrite tester("insert"); + tester.SetIsOlap(true); + tester.Execute(); + } + + Y_UNIT_TEST(TConflictWriteOlapUpsertPartial) { + TConflictWrite tester("upsert_partial"); + tester.SetIsOlap(true); + tester.Execute(); + } + + Y_UNIT_TEST(TConflictWriteOlapUpsertFull) { + TConflictWrite tester("upsert_full"); + tester.SetIsOlap(true); + tester.Execute(); + } + + Y_UNIT_TEST(TConflictWriteOlapReplace) { + TConflictWrite tester("replace"); + tester.SetIsOlap(true); + tester.Execute(); + } + + Y_UNIT_TEST(TConflictWriteOlapDelete) { + TConflictWrite tester("delete"); tester.SetIsOlap(true); tester.Execute(); } diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 1fce06e86b8f..6b96c01a6af1 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -105,6 +105,7 @@ namespace TEvColumnShard { YDB_READONLY_DEF(NColumnShard::TUnifiedPathId, PathId); YDB_READONLY(NOlap::TSnapshot, Snapshot, NOlap::TSnapshot::Zero()); YDB_READONLY_DEF(std::optional, LockId); + YDB_READONLY_DEF(bool, ReadOnlyConflicts); YDB_ACCESSOR(bool, Reverse, false); YDB_ACCESSOR(ui32, ItemsLimit, 0); YDB_READONLY_DEF(std::vector, ColumnIds); @@ -118,10 +119,11 @@ namespace TEvColumnShard { ColumnIds.emplace_back(id); } - TEvInternalScan(const NColumnShard::TUnifiedPathId pathId, const NOlap::TSnapshot& snapshot, const std::optional lockId) + TEvInternalScan(const NColumnShard::TUnifiedPathId pathId, const NOlap::TSnapshot& snapshot, const std::optional lockId, const bool readOnlyConflicts) : PathId(pathId) , Snapshot(snapshot) , LockId(lockId) + , ReadOnlyConflicts(readOnlyConflicts) { AFL_VERIFY(Snapshot.Valid()); } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 01ed82fb6090..b7e9bbbac23e 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -542,6 +542,8 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor } else { lockId = record.GetLockTxId(); } + // Serializable by default + NKikimrDataEvents::ELockMode lockMode = record.HasLockMode() ? record.GetLockMode() : NKikimrDataEvents::OPTIMISTIC; const bool isBulk = operation.HasIsBulk() && operation.GetIsBulk(); @@ -578,7 +580,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor LWPROBE(EvWrite, TabletID(), source.ToString(), cookie, record.GetTxId(), writeTimeout.value_or(TDuration::Max()), arrowData->GetSize(), "", true, operation.GetIsBulk(), "", ""); - WriteTasksQueue->Enqueue(TWriteTask(arrowData, schema, source, ev->Recipient, granuleShardingVersionId, pathId, cookie, mvccSnapshot, lockId, *mType, behaviour, writeTimeout, record.GetTxId(), + WriteTasksQueue->Enqueue(TWriteTask(arrowData, schema, source, ev->Recipient, granuleShardingVersionId, pathId, cookie, mvccSnapshot, lockId, lockMode, *mType, behaviour, writeTimeout, record.GetTxId(), isBulk, record.HasOverloadSubscribe() ? record.GetOverloadSubscribe() : std::optional())); WriteTasksQueue->Drain(false, ctx); } diff --git a/ydb/core/tx/columnshard/engines/reader/common/description.h b/ydb/core/tx/columnshard/engines/reader/common/description.h index 457fdab22333..c2ea32bda3cc 100644 --- a/ydb/core/tx/columnshard/engines/reader/common/description.h +++ b/ydb/core/tx/columnshard/engines/reader/common/description.h @@ -61,18 +61,20 @@ class TReadDescription { ScanCursor = cursor; } - void SetLock(std::optional lockId, std::optional lockMode, const NColumnShard::TLockFeatures* lock) { + void SetLock(std::optional lockId, std::optional lockMode, const NColumnShard::TLockFeatures* lock, const bool readOnlyConflicts) { LockId = lockId; LockMode = lockMode; - auto snapshotIsolation = lockMode.value_or(NKikimrDataEvents::OPTIMISTIC) == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION; + auto snapshotIsolation = lockId.has_value() && lockMode.value_or(NKikimrDataEvents::OPTIMISTIC) == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION; - // always true for now, will be false for reads that check only conflicts (comming soon with Snapshot Isolation) - readNonconflictingPortions = true; + readNonconflictingPortions = !readOnlyConflicts; // do not check conflicts for Snapshot isolated txs or txs with no lock - readConflictingPortions = LockId.has_value() && !snapshotIsolation; + readConflictingPortions = (LockId.has_value() && !snapshotIsolation) || readOnlyConflicts; + // if we need conflicting portions, we just take all uncommitted portions (from other txs and own) + // but if we do not need conflicting portions, we need to remember own portions ids, + // so that we can pick only own uncommitted portions for the read if (readNonconflictingPortions && !readConflictingPortions && lock != nullptr && lock->GetWriteOperations().size() > 0) { ownPortions = THashSet(); for (auto& writeOperation : lock->GetWriteOperations()) { @@ -87,6 +89,10 @@ class TReadDescription { if (ownPortions.has_value() && !ownPortions->empty()) { AFL_VERIFY(readNonconflictingPortions); } + // we do not have cases (at the moment) when we need to read only conflicts for a scan with no transaction + if (!LockId.has_value()) { + AFL_VERIFY(!readOnlyConflicts); + } } TReadDescription(const ui64 tabletId, const TSnapshot& snapshot, const ERequestSorting sorting) diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp index 08862ef37ab2..b3ae749bcc6b 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp @@ -55,7 +55,8 @@ void TTxInternalScan::Complete(const TActorContext& ctx) { read.SetLock( request.GetLockId(), NKikimrDataEvents::OPTIMISTIC, - request.GetLockId().has_value() ? Self->GetOperationsManager().GetLockOptional(request.GetLockId().value()) : nullptr + request.GetLockId().has_value() ? Self->GetOperationsManager().GetLockOptional(request.GetLockId().value()) : nullptr, + request.GetReadOnlyConflicts() ); read.DeduplicationPolicy = EDeduplicationPolicy::PREVENT_DUPLICATES; std::unique_ptr scannerConstructor(new NPlain::TIndexScannerConstructor(context)); diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp index 140fea1d072c..c9df075216cc 100644 --- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp +++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_scan.cpp @@ -71,7 +71,8 @@ void TTxScan::Complete(const TActorContext& ctx) { read.SetLock( request.HasLockTxId() ? std::make_optional(request.GetLockTxId()) : std::nullopt, request.HasLockMode() ? std::make_optional(request.GetLockMode()) : std::nullopt, - Self->GetOperationsManager().GetLockOptional(request.GetLockTxId()) + request.HasLockTxId() ? Self->GetOperationsManager().GetLockOptional(request.GetLockTxId()) : nullptr, + false ); { diff --git a/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp b/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp index 1b91fe8de782..412cd4f0201a 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/builder.cpp @@ -48,21 +48,35 @@ void TBuildBatchesTask::DoExecute(const std::shared_ptr& /*taskPtr*/) { } auto batch = preparedConclusion.DetachResult(); std::shared_ptr merger; + bool readOnlyConflicts = false; + auto handleReplace = [&]() -> bool { + if (Context.GetNoTxWrite()) { + NActors::TActivationContext::ActorSystem()->Send(Context.GetBufferizationPortionsActorId(), + new NWritingPortions::TEvAddInsertedDataToBuffer( + std::make_shared(WriteData), batch, std::make_shared(Context))); + return false; + } else if (Context.IsSnapshotIsolated()) { + merger = std::make_shared(batch, Context.GetActualSchema()); + readOnlyConflicts = true; + return true; + } else { + std::shared_ptr task = + std::make_shared(std::move(WriteData), batch.GetContainer(), Context); + NConveyorComposite::TInsertServiceOperator::SendTaskToExecute(task); + return false; + } + }; switch (WriteData.GetWriteMeta().GetModificationType()) { case NEvWrite::EModificationType::Upsert: { const std::vector> defaultFields = Context.GetActualSchema()->GetAbsentFields(batch.GetContainer()->schema()); if (defaultFields.empty()) { - if (!Context.GetNoTxWrite()) { - std::shared_ptr task = - std::make_shared(std::move(WriteData), batch.GetContainer(), Context); - NConveyorComposite::TInsertServiceOperator::SendTaskToExecute(task); + auto proceed = handleReplace(); + if (proceed) { + break; } else { - NActors::TActivationContext::ActorSystem()->Send(Context.GetBufferizationPortionsActorId(), - new NWritingPortions::TEvAddInsertedDataToBuffer( - std::make_shared(WriteData), batch, std::make_shared(Context))); + return; } - return; } else { auto insertionConclusion = Context.GetActualSchema()->CheckColumnsDefault(defaultFields); auto conclusion = Context.GetActualSchema()->BuildDefaultBatch(Context.GetActualSchema()->GetIndexInfo().ArrowSchema(), 1, true); @@ -89,20 +103,16 @@ void TBuildBatchesTask::DoExecute(const std::shared_ptr& /*taskPtr*/) { } case NEvWrite::EModificationType::Replace: case NEvWrite::EModificationType::Delete: { - if (!Context.GetNoTxWrite()) { - std::shared_ptr task = - std::make_shared(std::move(WriteData), batch.GetContainer(), Context); - NConveyorComposite::TInsertServiceOperator::SendTaskToExecute(task); + auto proceed = handleReplace(); + if (proceed) { + break; } else { - NActors::TActivationContext::ActorSystem()->Send(Context.GetBufferizationPortionsActorId(), - new NWritingPortions::TEvAddInsertedDataToBuffer( - std::make_shared(WriteData), batch, std::make_shared(Context))); + return; } - return; } } std::shared_ptr task = - std::make_shared(std::move(WriteData), merger, batch, Context); + std::make_shared(std::move(WriteData), merger, batch, Context, readOnlyConflicts); NActors::TActivationContext::AsActorContext().Register(new NDataReader::TActor(task)); } diff --git a/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp b/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp index e9c3085117d4..4ca1c82b5ad7 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/restore.cpp @@ -10,7 +10,7 @@ namespace NKikimr::NOlap { std::unique_ptr TModificationRestoreTask::DoBuildRequestInitiator() const { const auto& writeMetaData = WriteData.GetWriteMeta(); - auto request = std::make_unique(writeMetaData.GetPathId(), Context.GetApplyToSnapshot(), Context.GetLockId()); + auto request = std::make_unique(writeMetaData.GetPathId(), Context.GetApplyToSnapshot(), Context.GetLockId(), ReadOnlyConflicts); request->TaskIdentifier = GetTaskId(); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "restore_start")( "count", IncomingData.HasContainer() ? IncomingData->num_rows() : 0)("task_id", WriteData.GetWriteMeta().GetId()); @@ -61,13 +61,14 @@ NKikimr::TConclusionStatus TModificationRestoreTask::DoOnFinished() { return TConclusionStatus::Success(); } -TModificationRestoreTask::TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr& merger, const NArrow::TContainerWithIndexes& incomingData, const TWritingContext& context) +TModificationRestoreTask::TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr& merger, const NArrow::TContainerWithIndexes& incomingData, const TWritingContext& context, const bool readOnlyConflicts) : TBase(context.GetTabletId(), context.GetTabletActorId(), writeData.GetWriteMeta().GetId() + "::" + ::ToString(writeData.GetWriteMeta().GetWriteId())) , WriteData(std::move(writeData)) , Merger(merger) , IncomingData(incomingData) - , Context(context) { + , Context(context) + , ReadOnlyConflicts(readOnlyConflicts) { AFL_VERIFY(context.GetApplyToSnapshot().Valid()); } diff --git a/ydb/core/tx/columnshard/operations/batch_builder/restore.h b/ydb/core/tx/columnshard/operations/batch_builder/restore.h index d42ead49093d..087ec560e14c 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/restore.h +++ b/ydb/core/tx/columnshard/operations/batch_builder/restore.h @@ -16,6 +16,7 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask, public NColumn std::shared_ptr Merger; NArrow::TContainerWithIndexes IncomingData; const TWritingContext Context; + bool ReadOnlyConflicts; virtual std::unique_ptr DoBuildRequestInitiator() const override; virtual TConclusionStatus DoOnDataChunk(const std::shared_ptr& data) override; @@ -35,7 +36,7 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask, public NColumn virtual TDuration GetTimeout() const override; TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr& merger, - const NArrow::TContainerWithIndexes& incomingData, const TWritingContext& context); + const NArrow::TContainerWithIndexes& incomingData, const TWritingContext& context, const bool readOnlyConflicts); }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/operations/common/context.h b/ydb/core/tx/columnshard/operations/common/context.h index 5f2cf6633c0f..73fad0daaa4f 100644 --- a/ydb/core/tx/columnshard/operations/common/context.h +++ b/ydb/core/tx/columnshard/operations/common/context.h @@ -46,6 +46,7 @@ class TWritingContext { YDB_READONLY_DEF(std::shared_ptr, WritingCounters); YDB_READONLY(TSnapshot, ApplyToSnapshot, TSnapshot::Zero()); YDB_READONLY_DEF(std::optional, LockId); + YDB_READONLY_DEF(std::optional, LockMode); const std::shared_ptr ActivityChecker; YDB_READONLY(bool, NoTxWrite, false); YDB_READONLY(bool, IsBulk, false); @@ -63,14 +64,18 @@ class TWritingContext { bool IsActive() const { return ActivityChecker->IsActive(); } - + + bool IsSnapshotIsolated() const { + return LockId.has_value() && LockMode.value_or(NKikimrDataEvents::OPTIMISTIC) == NKikimrDataEvents::OPTIMISTIC_SNAPSHOT_ISOLATION; + } + TString GetErrorMessage() const { return ActivityChecker->GetErrorMessage(); } TWritingContext(const ui64 tabletId, const NActors::TActorId& tabletActorId, const std::shared_ptr& actualSchema, const std::shared_ptr& operators, const std::shared_ptr& splitterCounters, - const std::shared_ptr& writingCounters, const TSnapshot& applyToSnapshot, const std::optional& lockId, + const std::shared_ptr& writingCounters, const TSnapshot& applyToSnapshot, const std::optional& lockId, const std::optional& lockMode, const std::shared_ptr& activityChecker, const bool noTxWrite, const NActors::TActorId& bufferizationPortionsActorId, const bool isBulk) : TabletId(tabletId) @@ -82,6 +87,7 @@ class TWritingContext { , WritingCounters(writingCounters) , ApplyToSnapshot(applyToSnapshot) , LockId(lockId) + , LockMode(lockMode) , ActivityChecker(activityChecker) , NoTxWrite(noTxWrite) , IsBulk(isBulk) diff --git a/ydb/core/tx/columnshard/tablet/write_queue.cpp b/ydb/core/tx/columnshard/tablet/write_queue.cpp index a5e91074012c..7165b4cc051c 100644 --- a/ydb/core/tx/columnshard/tablet/write_queue.cpp +++ b/ydb/core/tx/columnshard/tablet/write_queue.cpp @@ -23,7 +23,7 @@ bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& /* ctx */) co writeOperation->SetBehaviour(Behaviour); const auto& applyToMvccSnapshot = MvccSnapshot.Valid() ? MvccSnapshot : NOlap::TSnapshot::Max(); NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager, - owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, applyToMvccSnapshot, LockId, + owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, applyToMvccSnapshot, LockId, LockMode, writeOperation->GetActivityChecker(), Behaviour == EOperationBehaviour::NoTxWrite, owner->BufferizationPortionsWriteActorId, IsBulk); // We don't need to split here portions by the last level // ArrowData->SetSeparationPoints(owner->GetIndexAs().GetGranulePtrVerified(PathId.InternalPathId)->GetBucketPositions()); diff --git a/ydb/core/tx/columnshard/tablet/write_queue.h b/ydb/core/tx/columnshard/tablet/write_queue.h index d95900f14504..b1a9a69ae466 100644 --- a/ydb/core/tx/columnshard/tablet/write_queue.h +++ b/ydb/core/tx/columnshard/tablet/write_queue.h @@ -20,6 +20,7 @@ class TWriteTask: public TMoveOnly { const ui64 Cookie; const NOlap::TSnapshot MvccSnapshot; const ui64 LockId; + const NKikimrDataEvents::ELockMode LockMode; const NEvWrite::EModificationType ModificationType; const EOperationBehaviour Behaviour; const TMonotonic Created = TMonotonic::Now(); @@ -38,7 +39,7 @@ class TWriteTask: public TMoveOnly { } TWriteTask(const std::shared_ptr& arrowData, const NOlap::ISnapshotSchema::TPtr& schema, const NActors::TActorId sourceId, const NActors::TActorId recipientId, - const std::optional& granuleShardingVersionId, const TUnifiedPathId pathId, const ui64 cookie, const NOlap::TSnapshot& mvccSnapshot, const ui64 lockId, + const std::optional& granuleShardingVersionId, const TUnifiedPathId pathId, const ui64 cookie, const NOlap::TSnapshot& mvccSnapshot, const ui64 lockId, const NKikimrDataEvents::ELockMode lockMode, const NEvWrite::EModificationType modificationType, const EOperationBehaviour behaviour, const std::optional timeout, const ui64 txId, const bool isBulk, const std::optional& overloadSubscribeSeqNo) : ArrowData(arrowData) , Schema(schema) @@ -49,6 +50,7 @@ class TWriteTask: public TMoveOnly { , Cookie(cookie) , MvccSnapshot(mvccSnapshot) , LockId(lockId) + , LockMode(lockMode) , ModificationType(modificationType) , Behaviour(behaviour) , Timeout(timeout)