From dde224bafabafd37db4f2969e56b2118eae6ffe1 Mon Sep 17 00:00:00 2001 From: Vitalii Gridnev Date: Wed, 29 Oct 2025 19:02:04 +0300 Subject: [PATCH] make sure that seqno is unique among all tasks --- .../kqp/runtime/kqp_stream_lookup_actor.cpp | 2 +- .../kqp/runtime/kqp_stream_lookup_worker.cpp | 17 +++++++++++++---- ydb/core/kqp/runtime/kqp_stream_lookup_worker.h | 1 + 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 06e3bc01d0d9..9fcca85c91da 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -46,7 +46,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) , LookupStrategy(settings.GetLookupStrategy()) - , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc)) + , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TaskId, args.TypeEnv, args.HolderFactory, args.InputDesc)) , Counters(counters) , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor") { diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index 181a0c7b4df8..445725e487b9 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -16,6 +16,8 @@ namespace NKikimr { namespace NKqp { constexpr ui64 MAX_IN_FLIGHT_LIMIT = 500; +constexpr ui64 SEQNO_SPACE = 40; +constexpr ui64 MaxTaskId = (1ULL << (64 - SEQNO_SPACE)); namespace { std::vector> GetRangePartitioning(const TKqpStreamLookupWorker::TPartitionInfo& partitionInfo, @@ -498,11 +500,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { class TKqpJoinRows : public TKqpStreamLookupWorker { public: - TKqpJoinRows(TLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, + TKqpJoinRows(TLookupSettings&& settings, ui64 taskId, + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) - , InputDesc(inputDesc) { - + , InputDesc(inputDesc) + , InputRowSeqNo(taskId << SEQNO_SPACE) + , InputRowSeqNoLast((taskId + 1) << SEQNO_SPACE) + { + YQL_ENSURE(taskId < MaxTaskId); // read columns should contain join key and result columns for (auto joinKey : Settings.LookupKeyColumns) { ReadColumns.emplace(joinKey->Name, *joinKey); @@ -537,6 +543,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { lastRow = cookie.LastRow; } else { rowSeqNo = InputRowSeqNo++; + YQL_ENSURE(InputRowSeqNo < InputRowSeqNoLast); } if (joinKey.HasValue()) { @@ -1193,6 +1200,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { absl::flat_hash_map PendingLeftRowsByKey; std::unordered_map ResultRowsBySeqNo; ui64 InputRowSeqNo = 0; + ui64 InputRowSeqNoLast = 0; ui64 JoinKeySeqNo = 0; ui64 CurrentResultSeqNo = 0; NMiniKQL::TStructType* LeftRowType = nullptr; @@ -1200,6 +1208,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { }; std::unique_ptr CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, + ui64 taskId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) { @@ -1250,7 +1259,7 @@ std::unique_ptr CreateStreamLookupWorker(NKikimrKqp::TKq return std::make_unique(std::move(preparedSettings), typeEnv, holderFactory); case NKqpProto::EStreamLookupStrategy::JOIN: case NKqpProto::EStreamLookupStrategy::SEMI_JOIN: - return std::make_unique(std::move(preparedSettings), typeEnv, holderFactory, inputDesc); + return std::make_unique(std::move(preparedSettings), taskId, typeEnv, holderFactory, inputDesc); default: return {}; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index 2f0f8de27567..f0e8b8ba3b7c 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -91,6 +91,7 @@ class TKqpStreamLookupWorker { }; std::unique_ptr CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, + ui64 taskId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc);