Skip to content

Commit 8cc8229

Browse files
authored
Simplify snapshot and lock passing for writes (#24328)
1 parent 6fef68d commit 8cc8229

File tree

10 files changed

+12
-31
lines changed

10 files changed

+12
-31
lines changed

ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
5252
read.TableMetadataAccessor = accConclusion.DetachResult();
5353
}
5454
}
55-
read.LockId = LockId;
55+
read.LockId = request.GetLockId();
5656
read.DeduplicationPolicy = EDeduplicationPolicy::PREVENT_DUPLICATES;
5757
std::unique_ptr<IScannerConstructor> scannerConstructor(new NPlain::TIndexScannerConstructor(context));
5858
read.ColumnIds = request.GetColumnIds();
@@ -90,7 +90,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
9090

9191
const ui64 requestCookie = Self->InFlightReadsTracker.AddInFlightRequest(readMetadataRange, index);
9292
auto scanActorId = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, Self->GetStoragesManager(),
93-
Self->DataAccessorsManager.GetObjectPtrVerified(), Self->ColumnDataManager.GetObjectPtrVerified(), TComputeShardingPolicy(), ScanId, LockId.value_or(0), ScanGen, requestCookie,
93+
Self->DataAccessorsManager.GetObjectPtrVerified(), Self->ColumnDataManager.GetObjectPtrVerified(), TComputeShardingPolicy(), ScanId, request.GetLockId().value_or(0), ScanGen, requestCookie,
9494
Self->TabletID(), TDuration::Max(), readMetadataRange, NKikimrDataEvents::FORMAT_ARROW, Self->Counters.GetScanCounters(), {}));
9595

9696
Self->InFlightReadsTracker.AddScanActorId(requestCookie, scanActorId);

ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ namespace NKikimr::NOlap::NReader {
77
class TTxInternalScan: public NTabletFlatExecutor::TTransactionBase<NColumnShard::TColumnShard> {
88
private:
99
using TBase = NTabletFlatExecutor::TTransactionBase<NColumnShard::TColumnShard>;
10-
TEvColumnShard::TEvInternalScan::TPtr InternalScanEvent;
10+
const TEvColumnShard::TEvInternalScan::TPtr InternalScanEvent;
1111
const ui32 ScanGen = 1;
1212
const ui32 ScanId = 1;
13-
const std::optional<ui64> LockId;
1413
void SendError(const TString& problem, const TString& details, const TActorContext& ctx) const;
1514

1615
public:
@@ -19,7 +18,6 @@ class TTxInternalScan: public NTabletFlatExecutor::TTransactionBase<NColumnShard
1918
TTxInternalScan(NColumnShard::TColumnShard* self, TEvColumnShard::TEvInternalScan::TPtr& ev)
2019
: TBase(self)
2120
, InternalScanEvent(ev)
22-
, LockId(InternalScanEvent->Get()->GetLockId())
2321
{
2422
}
2523

ydb/core/tx/columnshard/operations/batch_builder/builder.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ void TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) {
102102
}
103103
}
104104
std::shared_ptr<NDataReader::IRestoreTask> task =
105-
std::make_shared<NOlap::TModificationRestoreTask>(std::move(WriteData), merger, ActualSnapshot, batch, Context);
105+
std::make_shared<NOlap::TModificationRestoreTask>(std::move(WriteData), merger, batch, Context);
106106
NActors::TActivationContext::AsActorContext().Register(new NDataReader::TActor(task));
107107
}
108108

ydb/core/tx/columnshard/operations/batch_builder/builder.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ namespace NKikimr::NOlap {
1212
class TBuildBatchesTask: public NConveyor::ITask, public NColumnShard::TMonitoringObjectsCounter<TBuildBatchesTask> {
1313
private:
1414
NEvWrite::TWriteData WriteData;
15-
const TSnapshot ActualSnapshot;
1615
const TWritingContext Context;
1716
void ReplyError(const TString& message, const NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass errorClass);
1817

@@ -26,7 +25,6 @@ class TBuildBatchesTask: public NConveyor::ITask, public NColumnShard::TMonitori
2625

2726
TBuildBatchesTask(NEvWrite::TWriteData&& writeData, const TWritingContext& context)
2827
: WriteData(std::move(writeData))
29-
, ActualSnapshot(context.GetApplyToSnapshot())
3028
, Context(context) {
3129
WriteData.MutableWriteMeta().OnStage(NEvWrite::EWriteStage::BuildBatch);
3230
}

ydb/core/tx/columnshard/operations/batch_builder/restore.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace NKikimr::NOlap {
1010

1111
std::unique_ptr<TEvColumnShard::TEvInternalScan> TModificationRestoreTask::DoBuildRequestInitiator() const {
1212
const auto& writeMetaData = WriteData.GetWriteMeta();
13-
auto request = std::make_unique<TEvColumnShard::TEvInternalScan>(writeMetaData.GetPathId(), Snapshot, writeMetaData.GetLockIdOptional());
13+
auto request = std::make_unique<TEvColumnShard::TEvInternalScan>(writeMetaData.GetPathId(), Context.GetApplyToSnapshot(), Context.GetLockId());
1414
request->TaskIdentifier = GetTaskId();
1515
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_RESTORE)("event", "restore_start")(
1616
"count", IncomingData.HasContainer() ? IncomingData->num_rows() : 0)("task_id", WriteData.GetWriteMeta().GetId());
@@ -61,16 +61,14 @@ NKikimr::TConclusionStatus TModificationRestoreTask::DoOnFinished() {
6161
return TConclusionStatus::Success();
6262
}
6363

64-
TModificationRestoreTask::TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger,
65-
const TSnapshot actualSnapshot, const NArrow::TContainerWithIndexes<arrow::RecordBatch>& incomingData, const TWritingContext& context)
64+
TModificationRestoreTask::TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger, const NArrow::TContainerWithIndexes<arrow::RecordBatch>& incomingData, const TWritingContext& context)
6665
: TBase(context.GetTabletId(), context.GetTabletActorId(),
6766
writeData.GetWriteMeta().GetId() + "::" + ::ToString(writeData.GetWriteMeta().GetWriteId()))
6867
, WriteData(std::move(writeData))
6968
, Merger(merger)
70-
, Snapshot(actualSnapshot)
7169
, IncomingData(incomingData)
7270
, Context(context) {
73-
AFL_VERIFY(actualSnapshot.Valid());
71+
AFL_VERIFY(context.GetApplyToSnapshot().Valid());
7472
}
7573

7674
void TModificationRestoreTask::SendErrorMessage(

ydb/core/tx/columnshard/operations/batch_builder/restore.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask, public NColumn
1414
using TBase = NDataReader::IRestoreTask;
1515
NEvWrite::TWriteData WriteData;
1616
std::shared_ptr<IMerger> Merger;
17-
const TSnapshot Snapshot;
1817
NArrow::TContainerWithIndexes<arrow::RecordBatch> IncomingData;
1918
const TWritingContext Context;
2019
virtual std::unique_ptr<TEvColumnShard::TEvInternalScan> DoBuildRequestInitiator() const override;
@@ -31,7 +30,7 @@ class TModificationRestoreTask: public NDataReader::IRestoreTask, public NColumn
3130

3231
virtual TDuration GetTimeout() const override;
3332

34-
TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger, const TSnapshot actualSnapshot,
33+
TModificationRestoreTask(NEvWrite::TWriteData&& writeData, const std::shared_ptr<IMerger>& merger,
3534
const NArrow::TContainerWithIndexes<arrow::RecordBatch>& incomingData, const TWritingContext& context);
3635
};
3736

ydb/core/tx/columnshard/operations/common/context.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class TWritingContext {
1616
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TSplitterCounters>, SplitterCounters);
1717
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TWriteCounters>, WritingCounters);
1818
YDB_READONLY(TSnapshot, ApplyToSnapshot, TSnapshot::Zero());
19+
YDB_READONLY_DEF(std::optional<ui64>, LockId);
1920
const std::shared_ptr<const TAtomicCounter> ActivityChecker;
2021
YDB_READONLY(bool, NoTxWrite, false);
2122

@@ -35,7 +36,7 @@ class TWritingContext {
3536

3637
TWritingContext(const ui64 tabletId, const NActors::TActorId& tabletActorId, const std::shared_ptr<ISnapshotSchema>& actualSchema,
3738
const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<NColumnShard::TSplitterCounters>& splitterCounters,
38-
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters, const TSnapshot& applyToSnapshot,
39+
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters, const TSnapshot& applyToSnapshot, const std::optional<ui64>& lockId,
3940
const std::shared_ptr<const TAtomicCounter>& activityChecker, const bool noTxWrite, const NActors::TActorId& bufferizationPortionsActorId)
4041
: TabletId(tabletId)
4142
, BufferizationPortionsActorId(bufferizationPortionsActorId)
@@ -45,6 +46,7 @@ class TWritingContext {
4546
, SplitterCounters(splitterCounters)
4647
, WritingCounters(writingCounters)
4748
, ApplyToSnapshot(applyToSnapshot)
49+
, LockId(lockId)
4850
, ActivityChecker(activityChecker)
4951
, NoTxWrite(noTxWrite)
5052
{

ydb/core/tx/columnshard/operations/write.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ void TWriteOperation::Start(
3636
auto writeMeta = std::make_shared<NEvWrite::TWriteMeta>(
3737
(ui64)WriteId, PathId, source, GranuleShardingVersionId, GetIdentifier(),
3838
context.GetWritingCounters()->GetWriteFlowCounters());
39-
writeMeta->SetLockId(LockId);
4039
writeMeta->SetModificationType(ModificationType);
4140
writeMeta->SetBulk(IsBulk());
4241
auto writingAction = owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR);

ydb/core/tx/columnshard/tablet/write_queue.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ bool TWriteTask::Execute(TColumnShard* owner, const TActorContext& /* ctx */) co
2222
writeOperation->SetBehaviour(Behaviour);
2323
const auto& applyToMvccSnapshot = MvccSnapshot.Valid() ? MvccSnapshot : NOlap::TSnapshot::Max();
2424
NOlap::TWritingContext wContext(owner->TabletID(), owner->SelfId(), Schema, owner->StoragesManager,
25-
owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, applyToMvccSnapshot,
25+
owner->Counters.GetIndexationCounters().SplitterCounters, owner->Counters.GetCSCounters().WritingCounters, applyToMvccSnapshot, LockId,
2626
writeOperation->GetActivityChecker(), Behaviour == EOperationBehaviour::NoTxWrite, owner->BufferizationPortionsWriteActorId);
2727
// We don't need to split here portions by the last level
2828
// ArrowData->SetSeparationPoints(owner->GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(PathId.InternalPathId)->GetBucketPositions());

ydb/core/tx/data_events/write_data.h

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ class TWriteMeta: public NColumnShard::TMonitoringObjectsCounter<TWriteMeta>, TN
4949

5050
YDB_ACCESSOR(EModificationType, ModificationType, EModificationType::Replace);
5151
YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now());
52-
std::optional<ui64> LockId;
5352
const std::shared_ptr<TWriteFlowCounters> Counters;
5453
mutable NOlap::NCounters::TStateSignalsOperator<NEvWrite::EWriteStage>::TGuard StateGuard;
5554

@@ -64,18 +63,6 @@ class TWriteMeta: public NColumnShard::TMonitoringObjectsCounter<TWriteMeta>, TN
6463
}
6564
}
6665

67-
void SetLockId(const ui64 lockId) {
68-
LockId = lockId;
69-
}
70-
71-
ui64 GetLockIdVerified() const {
72-
AFL_VERIFY(LockId);
73-
return *LockId;
74-
}
75-
76-
std::optional<ui64> GetLockIdOptional() const {
77-
return LockId;
78-
}
7966

8067
bool IsGuaranteeWriter() const {
8168
switch (ModificationType) {

0 commit comments

Comments
 (0)