From d0c52929403d09d41ec7628eb43482c13afd96c9 Mon Sep 17 00:00:00 2001 From: xyliganSereja Date: Wed, 8 Oct 2025 23:28:55 +0300 Subject: [PATCH 1/4] added --- .../common/thread_safe_optional.cpp | 6 ++ .../columnshard/common/thread_safe_optional.h | 96 +++++++++++++++++++ ydb/core/tx/columnshard/common/ya.make | 1 + .../engines/portions/constructor_portion.cpp | 4 +- .../engines/portions/portion_info.cpp | 14 ++- .../engines/portions/portion_info.h | 20 ++-- 6 files changed, 126 insertions(+), 15 deletions(-) create mode 100644 ydb/core/tx/columnshard/common/thread_safe_optional.cpp create mode 100644 ydb/core/tx/columnshard/common/thread_safe_optional.h diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.cpp b/ydb/core/tx/columnshard/common/thread_safe_optional.cpp new file mode 100644 index 000000000000..15e225e72083 --- /dev/null +++ b/ydb/core/tx/columnshard/common/thread_safe_optional.cpp @@ -0,0 +1,6 @@ +#include "thread_safe_optional.h" + +namespace NKikimr::NOlap { + +} + diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.h b/ydb/core/tx/columnshard/common/thread_safe_optional.h new file mode 100644 index 000000000000..8625132fa127 --- /dev/null +++ b/ydb/core/tx/columnshard/common/thread_safe_optional.h @@ -0,0 +1,96 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NOlap { + +template +class TThreadSafeOptional { +private: + alignas(T) unsigned char Storage[sizeof(T)]; + std::atomic Defined{ false }; + + T *Ptr() { + return reinterpret_cast(&Storage[0]); + } + + const T *Ptr() const { + return reinterpret_cast(&Storage[0]); + } + +public: + TThreadSafeOptional() = default; + + ~TThreadSafeOptional() { + if (Has()) { + Ptr()->~T(); + } + } + + TThreadSafeOptional(const TThreadSafeOptional& other) { + const bool has = other.Defined.load(std::memory_order_acquire); + if (has) { + ::new (Ptr()) T(*other.Ptr()); + Defined.store(true, std::memory_order_release); + } + } + + TThreadSafeOptional& operator=(const TThreadSafeOptional& other) = delete; + + TThreadSafeOptional(TThreadSafeOptional&& other) noexcept { + const bool has = other.Defined.load(std::memory_order_acquire); + if (has) { + ::new (Ptr()) T(std::move(*other.Ptr())); + Defined.store(true, std::memory_order_release); + } + } + + TThreadSafeOptional& operator=(TThreadSafeOptional&& other) noexcept { + if (this == &other) { + return *this; + } + + AFL_VERIFY(!Has()); + const bool has = other.Defined.load(std::memory_order_acquire); + if (has) { + ::new (Ptr()) T(std::move(*other.Ptr())); + Defined.store(true, std::memory_order_release); + } + + return *this; + } + + void Reset() = delete; + + void Set(const T& value) { + AFL_VERIFY(!Has()); + ::new (Ptr()) T(value); + Defined.store(true, std::memory_order_release); + } + + void Set(T&& value) { + AFL_VERIFY(!Has()); + ::new (Ptr()) T(std::move(value)); + Defined.store(true, std::memory_order_release); + } + + bool Has() const { + return Defined.load(std::memory_order_acquire); + } + + const T& Get() const { + return *Ptr(); + } + + std::optional GetOptional() const { + if (Has()) { + return *Ptr(); + } + + return {}; + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index 86c71afb41ba..8b3f40838e3b 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -7,6 +7,7 @@ SRCS( snapshot.cpp portion.cpp tablet_id.cpp + thread_safe_optional.cpp blob.cpp volume.cpp path_id.cpp diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp index 38c4c1372939..cb9e61cd15fb 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp @@ -29,12 +29,14 @@ std::shared_ptr TPortionInfoConstructor::Build() { if (RemoveSnapshot) { AFL_VERIFY(RemoveSnapshot->Valid()); - result->RemoveSnapshot = *RemoveSnapshot; + result->SetRemoveSnapshot(*RemoveSnapshot); } + AFL_VERIFY(SchemaVersion && *SchemaVersion); result->SchemaVersion = *SchemaVersion; result->ShardingVersion = ShardingVersion; } + static TAtomicCounter countValues = 0; static TAtomicCounter sumValues = 0; AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_size", result->GetMemorySize())("data_size", result->GetDataSize())( diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index b827d6e19472..14f91a801893 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -38,8 +38,8 @@ TString TPortionInfo::DebugString(const bool withDetails) const { sb << "column_size:" << GetColumnBlobBytes() << ";" << "index_size:" << GetIndexBlobBytes() << ";" << "meta:(" << Meta.DebugString() << ");"; - if (RemoveSnapshot.Valid()) { - sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");"; + if (HasRemoveSnapshot()) { + sb << "remove_snapshot:(" << RemoveSnapshot.Get().DebugString() << ");"; } return sb << ")"; } @@ -56,8 +56,8 @@ void TPortionInfo::SerializeToProto(const std::vector& blobIds, PathId.ToProto(proto); proto.SetPortionId(PortionId); proto.SetSchemaVersion(GetSchemaVersionVerified()); - if (!RemoveSnapshot.IsZero()) { - *proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto(); + if (HasRemoveSnapshot()) { + *proto.MutableRemoveSnapshot() = RemoveSnapshot.Get().SerializeToProto(); } *proto.MutableMeta() = Meta.SerializeToProto(blobIds, GetProduced()); @@ -71,11 +71,15 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat return TConclusionStatus::Fail("portion's schema version cannot been equals to zero"); } if (proto.HasRemoveSnapshot()) { - auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot()); + TSnapshot tmp = TSnapshot::Zero(); + auto parse = tmp.DeserializeFromProto(proto.GetRemoveSnapshot()); if (!parse) { return parse; } + + RemoveSnapshot.Set(std::move(tmp)); } + return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 81d2ade7c328..b4ce802d4d08 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -14,6 +15,7 @@ #include + namespace NKikimrColumnShardDataSharingProto { class TPortionInfo; } @@ -89,7 +91,7 @@ class TPortionInfo { TInternalPathId PathId; ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId - TSnapshot RemoveSnapshot = TSnapshot::Zero(); + TThreadSafeOptional RemoveSnapshot; ui64 SchemaVersion = 0; std::optional ShardingVersion; @@ -213,8 +215,8 @@ class TPortionInfo { } void SetRemoveSnapshot(const TSnapshot& snap) { - AFL_VERIFY(!RemoveSnapshot.Valid()); - RemoveSnapshot = snap; + AFL_VERIFY(!HasRemoveSnapshot()); + RemoveSnapshot.Set(snap); } void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { @@ -339,7 +341,7 @@ class TPortionInfo { TString DebugString(const bool withDetails = false) const; bool HasRemoveSnapshot() const { - return RemoveSnapshot.Valid(); + return RemoveSnapshot.Has(); } bool IsRemovedFor(const TSnapshot& snapshot) const { @@ -376,12 +378,12 @@ class TPortionInfo { const TSnapshot& GetRemoveSnapshotVerified() const { AFL_VERIFY(HasRemoveSnapshot()); - return RemoveSnapshot; + return RemoveSnapshot.Get(); } std::optional GetRemoveSnapshotOptional() const { - if (RemoveSnapshot.Valid()) { - return RemoveSnapshot; + if (HasRemoveSnapshot()) { + return RemoveSnapshot.Get(); } else { return {}; } @@ -392,8 +394,8 @@ class TPortionInfo { return SchemaVersion; } - bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot) const { - const bool visible = (!RemoveSnapshot.Valid() || snapshot < RemoveSnapshot) && DoIsVisible(snapshot, checkCommitSnapshot); + bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const { + const bool visible = (!HasRemoveSnapshot() || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot); AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)( "snapshot", snapshot.DebugString()); From 312169315f40d2120929aa2020ae97eaece6fd19 Mon Sep 17 00:00:00 2001 From: Matveev Sergei Date: Fri, 10 Oct 2025 13:31:27 +0000 Subject: [PATCH 2/4] Revert "added" This reverts commit d0c52929403d09d41ec7628eb43482c13afd96c9. --- .../common/thread_safe_optional.cpp | 6 -- .../columnshard/common/thread_safe_optional.h | 96 ------------------- ydb/core/tx/columnshard/common/ya.make | 1 - .../engines/portions/constructor_portion.cpp | 4 +- .../engines/portions/portion_info.cpp | 14 +-- .../engines/portions/portion_info.h | 20 ++-- 6 files changed, 15 insertions(+), 126 deletions(-) delete mode 100644 ydb/core/tx/columnshard/common/thread_safe_optional.cpp delete mode 100644 ydb/core/tx/columnshard/common/thread_safe_optional.h diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.cpp b/ydb/core/tx/columnshard/common/thread_safe_optional.cpp deleted file mode 100644 index 15e225e72083..000000000000 --- a/ydb/core/tx/columnshard/common/thread_safe_optional.cpp +++ /dev/null @@ -1,6 +0,0 @@ -#include "thread_safe_optional.h" - -namespace NKikimr::NOlap { - -} - diff --git a/ydb/core/tx/columnshard/common/thread_safe_optional.h b/ydb/core/tx/columnshard/common/thread_safe_optional.h deleted file mode 100644 index 8625132fa127..000000000000 --- a/ydb/core/tx/columnshard/common/thread_safe_optional.h +++ /dev/null @@ -1,96 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace NKikimr::NOlap { - -template -class TThreadSafeOptional { -private: - alignas(T) unsigned char Storage[sizeof(T)]; - std::atomic Defined{ false }; - - T *Ptr() { - return reinterpret_cast(&Storage[0]); - } - - const T *Ptr() const { - return reinterpret_cast(&Storage[0]); - } - -public: - TThreadSafeOptional() = default; - - ~TThreadSafeOptional() { - if (Has()) { - Ptr()->~T(); - } - } - - TThreadSafeOptional(const TThreadSafeOptional& other) { - const bool has = other.Defined.load(std::memory_order_acquire); - if (has) { - ::new (Ptr()) T(*other.Ptr()); - Defined.store(true, std::memory_order_release); - } - } - - TThreadSafeOptional& operator=(const TThreadSafeOptional& other) = delete; - - TThreadSafeOptional(TThreadSafeOptional&& other) noexcept { - const bool has = other.Defined.load(std::memory_order_acquire); - if (has) { - ::new (Ptr()) T(std::move(*other.Ptr())); - Defined.store(true, std::memory_order_release); - } - } - - TThreadSafeOptional& operator=(TThreadSafeOptional&& other) noexcept { - if (this == &other) { - return *this; - } - - AFL_VERIFY(!Has()); - const bool has = other.Defined.load(std::memory_order_acquire); - if (has) { - ::new (Ptr()) T(std::move(*other.Ptr())); - Defined.store(true, std::memory_order_release); - } - - return *this; - } - - void Reset() = delete; - - void Set(const T& value) { - AFL_VERIFY(!Has()); - ::new (Ptr()) T(value); - Defined.store(true, std::memory_order_release); - } - - void Set(T&& value) { - AFL_VERIFY(!Has()); - ::new (Ptr()) T(std::move(value)); - Defined.store(true, std::memory_order_release); - } - - bool Has() const { - return Defined.load(std::memory_order_acquire); - } - - const T& Get() const { - return *Ptr(); - } - - std::optional GetOptional() const { - if (Has()) { - return *Ptr(); - } - - return {}; - } -}; - -} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make index 8b3f40838e3b..86c71afb41ba 100644 --- a/ydb/core/tx/columnshard/common/ya.make +++ b/ydb/core/tx/columnshard/common/ya.make @@ -7,7 +7,6 @@ SRCS( snapshot.cpp portion.cpp tablet_id.cpp - thread_safe_optional.cpp blob.cpp volume.cpp path_id.cpp diff --git a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp index cb9e61cd15fb..38c4c1372939 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor_portion.cpp @@ -29,14 +29,12 @@ std::shared_ptr TPortionInfoConstructor::Build() { if (RemoveSnapshot) { AFL_VERIFY(RemoveSnapshot->Valid()); - result->SetRemoveSnapshot(*RemoveSnapshot); + result->RemoveSnapshot = *RemoveSnapshot; } - AFL_VERIFY(SchemaVersion && *SchemaVersion); result->SchemaVersion = *SchemaVersion; result->ShardingVersion = ShardingVersion; } - static TAtomicCounter countValues = 0; static TAtomicCounter sumValues = 0; AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_size", result->GetMemorySize())("data_size", result->GetDataSize())( diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 14f91a801893..b827d6e19472 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -38,8 +38,8 @@ TString TPortionInfo::DebugString(const bool withDetails) const { sb << "column_size:" << GetColumnBlobBytes() << ";" << "index_size:" << GetIndexBlobBytes() << ";" << "meta:(" << Meta.DebugString() << ");"; - if (HasRemoveSnapshot()) { - sb << "remove_snapshot:(" << RemoveSnapshot.Get().DebugString() << ");"; + if (RemoveSnapshot.Valid()) { + sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");"; } return sb << ")"; } @@ -56,8 +56,8 @@ void TPortionInfo::SerializeToProto(const std::vector& blobIds, PathId.ToProto(proto); proto.SetPortionId(PortionId); proto.SetSchemaVersion(GetSchemaVersionVerified()); - if (HasRemoveSnapshot()) { - *proto.MutableRemoveSnapshot() = RemoveSnapshot.Get().SerializeToProto(); + if (!RemoveSnapshot.IsZero()) { + *proto.MutableRemoveSnapshot() = RemoveSnapshot.SerializeToProto(); } *proto.MutableMeta() = Meta.SerializeToProto(blobIds, GetProduced()); @@ -71,15 +71,11 @@ TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDat return TConclusionStatus::Fail("portion's schema version cannot been equals to zero"); } if (proto.HasRemoveSnapshot()) { - TSnapshot tmp = TSnapshot::Zero(); - auto parse = tmp.DeserializeFromProto(proto.GetRemoveSnapshot()); + auto parse = RemoveSnapshot.DeserializeFromProto(proto.GetRemoveSnapshot()); if (!parse) { return parse; } - - RemoveSnapshot.Set(std::move(tmp)); } - return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index b4ce802d4d08..81d2ade7c328 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -7,7 +7,6 @@ #include #include #include -#include #include #include @@ -15,7 +14,6 @@ #include - namespace NKikimrColumnShardDataSharingProto { class TPortionInfo; } @@ -91,7 +89,7 @@ class TPortionInfo { TInternalPathId PathId; ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId - TThreadSafeOptional RemoveSnapshot; + TSnapshot RemoveSnapshot = TSnapshot::Zero(); ui64 SchemaVersion = 0; std::optional ShardingVersion; @@ -215,8 +213,8 @@ class TPortionInfo { } void SetRemoveSnapshot(const TSnapshot& snap) { - AFL_VERIFY(!HasRemoveSnapshot()); - RemoveSnapshot.Set(snap); + AFL_VERIFY(!RemoveSnapshot.Valid()); + RemoveSnapshot = snap; } void SetRemoveSnapshot(const ui64 planStep, const ui64 txId) { @@ -341,7 +339,7 @@ class TPortionInfo { TString DebugString(const bool withDetails = false) const; bool HasRemoveSnapshot() const { - return RemoveSnapshot.Has(); + return RemoveSnapshot.Valid(); } bool IsRemovedFor(const TSnapshot& snapshot) const { @@ -378,12 +376,12 @@ class TPortionInfo { const TSnapshot& GetRemoveSnapshotVerified() const { AFL_VERIFY(HasRemoveSnapshot()); - return RemoveSnapshot.Get(); + return RemoveSnapshot; } std::optional GetRemoveSnapshotOptional() const { - if (HasRemoveSnapshot()) { - return RemoveSnapshot.Get(); + if (RemoveSnapshot.Valid()) { + return RemoveSnapshot; } else { return {}; } @@ -394,8 +392,8 @@ class TPortionInfo { return SchemaVersion; } - bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot = true) const { - const bool visible = (!HasRemoveSnapshot() || snapshot < GetRemoveSnapshotVerified()) && DoIsVisible(snapshot, checkCommitSnapshot); + bool IsVisible(const TSnapshot& snapshot, const bool checkCommitSnapshot) const { + const bool visible = (!RemoveSnapshot.Valid() || snapshot < RemoveSnapshot) && DoIsVisible(snapshot, checkCommitSnapshot); AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)( "snapshot", snapshot.DebugString()); From 4256c83eeb1b01b42a318c09e254c31b70ab73cc Mon Sep 17 00:00:00 2001 From: xyliganSereja Date: Fri, 10 Oct 2025 17:46:08 +0300 Subject: [PATCH 3/4] fix counters --- ydb/core/tx/columnshard/engines/reader/actor/actor.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index 9793f1f16c6f..9ee2bd5a730f 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -105,6 +105,7 @@ void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResu } void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev) { + auto ackProcessingGuard = ScanCountersPool.GetResultsForReplyGuard(); StartWaitTime = TInstant::Now(); auto g = Stats->MakeGuard("ack", IS_INFO_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN)); @@ -180,7 +181,7 @@ void TColumnShardScan::CheckHanging(const bool logging) const { "debug", ScanIterator ? ScanIterator->DebugString() : Default())("last", LastResultInstant); } const bool ok = !!FinishInstant || !ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting(); - AFL_VERIFY_DEBUG(ok) + AFL_VERIFY(ok) ("finished", ScanIterator->Finished()) ("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)("debug", ScanIterator->DebugString())( "counters", ScanCountersPool.DebugString()); From 8241075f4ee046f750bb1ac4f2d51c7f50177d27 Mon Sep 17 00:00:00 2001 From: Matveev Sergei Date: Fri, 10 Oct 2025 15:25:49 +0000 Subject: [PATCH 4/4] verify to verify debug --- ydb/core/tx/columnshard/engines/reader/actor/actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index 9ee2bd5a730f..11d73e13a0b1 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -181,7 +181,7 @@ void TColumnShardScan::CheckHanging(const bool logging) const { "debug", ScanIterator ? ScanIterator->DebugString() : Default())("last", LastResultInstant); } const bool ok = !!FinishInstant || !ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting(); - AFL_VERIFY(ok) + AFL_VERIFY_DEBUG(ok) ("finished", ScanIterator->Finished()) ("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)("debug", ScanIterator->DebugString())( "counters", ScanCountersPool.DebugString());