Skip to content

Commit aa1cb3a

Browse files
block join with neumann ht and packed tuple format (#27450)
Co-authored-by: Mikhail Filitov <filitovme@gmail.com>
1 parent db27fa0 commit aa1cb3a

22 files changed

+651
-301
lines changed

ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -475,12 +475,12 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
475475
auto wideStreamComponentsSize = [](TRuntimeNode node)->int {
476476
return AS_TYPE(TMultiType, AS_TYPE(TStreamType,node.GetStaticType())->GetItemType())->GetElementsCount();
477477
};
478-
TDqRenames renames{};
478+
TDqUserRenames renames{};
479479
for(int index = 0; index < wideStreamComponentsSize(leftInput) - 1; ++index) {
480-
renames.emplace_back(index, JoinSide::kLeft);
480+
renames.emplace_back(index, EJoinSide::kLeft);
481481
}
482482
for(int index = 0; index < wideStreamComponentsSize(rightInput) - 1; ++index) {
483-
renames.emplace_back(index, JoinSide::kRight);
483+
renames.emplace_back(index, EJoinSide::kRight);
484484
}
485485
return TGraceJoinRenames::FromDq(renames);
486486
}();

ydb/core/kqp/tools/join_perf/construct_join_graph.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ THolder<IComputationGraph> ConstructJoinGraphStream(EJoinKind joinKind, ETestedJ
8888

8989
const TVector<TType*> resultTypesArr = [&] {
9090
TVector<TType*> arr;
91-
TDqRenames dqRenames = FromGraceFormat(renames);
91+
TDqUserRenames dqRenames = FromGraceFormat(renames);
9292
for (auto rename : dqRenames) {
93-
if (rename.Side == JoinSide::kLeft) {
93+
if (rename.Side == EJoinSide::kLeft) {
9494
auto* resType = descr.LeftSource.ColumnTypes[rename.Index];
9595
arr.push_back([&] {
9696
if (ForceLeftOptional(joinKind) && !resType->IsOptional()) {

ydb/core/kqp/tools/join_perf/construct_join_graph.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ struct TJoinDescription {
1717
TJoinSourceData LeftSource;
1818
TJoinSourceData RightSource;
1919
TDqSetup<false>* Setup;
20-
std::optional<TDqRenames> CustomRenames;
20+
std::optional<TDqUserRenames> CustomRenames;
2121
};
2222

2323
bool IsBlockJoin(ETestedJoinAlgo algo);

ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp

Lines changed: 176 additions & 126 deletions
Large diffs are not rendered by default.

ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#pragma once
22
#include "type_utils.h"
33
#include <util/string/printf.h>
4+
#include <ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.h>
5+
#include <ydb/library/yql/dq/comp_nodes/hash_join_utils/neumann_hash_table.h>
46
#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
57

68
namespace NKikimr::NMiniKQL::NJoinTable {
@@ -28,7 +30,8 @@ class TStdJoinTable {
2830

2931
void Add(TSizedTuple tuple) {
3032
MKQL_ENSURE(BuiltTable.empty(), "JoinTable is built already");
31-
MKQL_ENSURE(std::ssize(tuple) == TupleSize, Sprintf("tuple size promise(%i) vs actual(%i) mismatch", TupleSize, std::ssize(tuple)));
33+
MKQL_ENSURE(std::ssize(tuple) == TupleSize,
34+
Sprintf("tuple size promise(%i) vs actual(%i) mismatch", TupleSize, std::ssize(tuple)));
3235
for (int idx = 0; idx < TupleSize; ++idx) {
3336
Tuples.push_back(tuple[idx]);
3437
}
@@ -81,4 +84,38 @@ class TStdJoinTable {
8184
BuiltTable;
8285
};
8386

87+
class TNeumannJoinTable : NNonCopyable::TMoveOnly {
88+
public:
89+
struct Tuple {
90+
const ui8* PackedData;
91+
const ui8* OverflowBegin;
92+
};
93+
94+
TNeumannJoinTable(const NPackedTuple::TTupleLayout* layout)
95+
: Table_(layout)
96+
{}
97+
98+
void BuildWith(IBlockLayoutConverter::TPackResult data) {
99+
BuildData_ = std::move(data);
100+
Table_.Build(BuildData_.PackedTuples.data(), BuildData_.Overflow.data(), BuildData_.NTuples);
101+
Built_ = true;
102+
}
103+
104+
bool Empty() {
105+
return Table_.Empty();
106+
}
107+
108+
void Lookup(Tuple row, std::invocable<Tuple> auto consume) const {
109+
MKQL_ENSURE(Built_, "table must be built before lookup");
110+
Table_.Apply(row.PackedData, row.OverflowBegin, [consume, this](const ui8* tuplePackedData) {
111+
consume(Tuple{tuplePackedData, BuildData_.Overflow.data()});
112+
});
113+
}
114+
115+
private:
116+
bool Built_ = false;
117+
IBlockLayoutConverter::TPackResult BuildData_;
118+
NKikimr::NMiniKQL::NPackedTuple::TNeumannHashTable<false, false> Table_;
119+
};
120+
84121
} // namespace NKikimr::NMiniKQL::NJoinTable

ydb/library/yql/dq/comp_nodes/dq_join_common.h

Lines changed: 150 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include "dq_hash_join_table.h"
33
#include <vector>
4+
#include <ydb/library/yql/dq/comp_nodes/hash_join_utils/block_layout_converter.h>
45
#include <yql/essentials/minikql/computation/mkql_block_reader.h>
56
#include <yql/essentials/minikql/computation/mkql_computation_node.h>
67
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
@@ -12,6 +13,46 @@ struct TColumnsMetadata {
1213
std::vector<TType*> ColumnTypes;
1314
};
1415

16+
enum class ESide { Probe, Build };
17+
18+
template <typename T> struct TSides {
19+
T Build;
20+
T Probe;
21+
22+
T& SelectSide(ESide side) {
23+
return side == ESide::Build ? Build : Probe;
24+
}
25+
26+
const T& SelectSide(ESide side) const {
27+
return side == ESide::Build ? Build : Probe;
28+
}
29+
};
30+
31+
32+
/*
33+
usage:
34+
instead of copy pasting code and changing "build" to "probe", use TSides and ForEachSide to call same code twice.
35+
example:
36+
37+
void f(int buildSize, int probeSize, bool buildRequired, bool probeRequired){
38+
use(buildSize + (buildRequired ? 0 : transform(buildSize)));
39+
use(probeSize + (probeRequired ? 0 : transform(probeSize)));
40+
}
41+
42+
vs
43+
44+
void f(TSides<int> sizes, TSides<bool> required) {
45+
ForEachSide([&](ESide side){
46+
use(sizes.SelectSide(side) + (required.SelectSide(side) ? 0 : transform(sizes.SelectSide(side))));
47+
});
48+
}
49+
50+
*/
51+
void ForEachSide(std::invocable<ESide> auto fn) {
52+
fn(ESide::Build);
53+
fn(ESide::Probe);
54+
}
55+
1556
struct TJoinMetadata {
1657
TColumnsMetadata Build;
1758
TColumnsMetadata Probe;
@@ -21,7 +62,7 @@ struct TJoinMetadata {
2162
TKeyTypes KeyTypesFromColumns(const std::vector<TType*>& types, const std::vector<ui32>& keyIndexes);
2263

2364
template <EJoinKind Kind> struct TRenamedOutput {
24-
TRenamedOutput(TDqRenames renames, const std::vector<TType*>& leftColumnTypes,
65+
TRenamedOutput(TDqUserRenames renames, const std::vector<TType*>& leftColumnTypes,
2566
const std::vector<TType*>& rightColumnTypes)
2667
: OutputBuffer()
2768
, NullTuples(std::max(leftColumnTypes.size(), rightColumnTypes.size()), NYql::NUdf::TUnboxedValuePod{})
@@ -46,13 +87,6 @@ template <EJoinKind Kind> struct TRenamedOutput {
4687
MKQL_ENSURE(tuple != nullptr, "null output row in semi/only join?");
4788
for (int index = 0; index < std::ssize(Renames); ++index) {
4889
auto thisRename = Renames[index];
49-
if constexpr (Kind == EJoinKind::LeftOnly || Kind == EJoinKind::LeftSemi) {
50-
MKQL_ENSURE(thisRename.Side == JoinSide::kLeft,
51-
"rename has right column for LeftOnly or LeftSemi join??");
52-
} else {
53-
MKQL_ENSURE(thisRename.Side == JoinSide::kRight,
54-
"rename has left column for RightOnly or RightSemi join??");
55-
}
5690
OutputBuffer.push_back(tuple[thisRename.Index]);
5791
}
5892
};
@@ -67,7 +101,7 @@ template <EJoinKind Kind> struct TRenamedOutput {
67101
}
68102
for (int index = 0; index < std::ssize(Renames); ++index) {
69103
auto thisRename = Renames[index];
70-
if (thisRename.Side == JoinSide::kLeft) {
104+
if (thisRename.Side == EJoinSide::kLeft) {
71105
OutputBuffer.push_back(probe[thisRename.Index]);
72106
} else {
73107
OutputBuffer.push_back(build[thisRename.Index]);
@@ -80,13 +114,14 @@ template <EJoinKind Kind> struct TRenamedOutput {
80114

81115
private:
82116
const std::vector<NYql::NUdf::TUnboxedValue> NullTuples;
83-
const TDqRenames Renames;
117+
const TDqUserRenames Renames;
84118
};
85119

86120
// Some joins produce concatenation of 2 tuples, some produce one tuple(effectively)
87-
template <typename Fun>
88-
concept JoinMatchFun =
89-
std::invocable<Fun, NJoinTable::TTuple> || std::invocable<Fun, NJoinTable::TTuple, NJoinTable::TTuple>;
121+
template <typename Fun, typename Tuple>
122+
concept JoinMatchFun = std::invocable<Fun, NJoinTable::TTuple> || std::invocable<Fun, TSides<Tuple>>;
123+
124+
IBlockLayoutConverter::TPackResult Flatten(std::vector<IBlockLayoutConverter::TPackResult> tuples);
90125

91126
template <typename Source, EJoinKind Kind> class TJoin : public TComputationValue<TJoin<Source, Kind>> {
92127
using TBase = TComputationValue<TJoin>;
@@ -120,7 +155,7 @@ template <typename Source, EJoinKind Kind> class TJoin : public TComputationValu
120155
return Build_.UserDataSize();
121156
}
122157

123-
EFetchResult MatchRows(TComputationContext& ctx, JoinMatchFun auto consumeOneOrTwoTuples) {
158+
EFetchResult MatchRows(TComputationContext& ctx, auto consumeOneOrTwoTuples) {
124159
while (!Build_.Finished()) {
125160
auto res = Build_.ForEachRow(ctx, [&](auto tuple) { Table_.Add({tuple, tuple + Build_.UserDataSize()}); });
126161
switch (res) {
@@ -163,27 +198,32 @@ template <typename Source, EJoinKind Kind> class TJoin : public TComputationValu
163198
});
164199
switch (result) {
165200
case NYql::NUdf::EFetchStatus::Finish: {
201+
int consumedTotal = 0;
166202
if (Table_.UnusedTrackingOn()) {
167203
if constexpr (Kind == EJoinKind::RightSemi) {
168204
for (auto& v : Table_.MapView()) {
169205
if (v.second.Used) {
170206
for (NJoinTable::TTuple used : v.second.Tuples) {
207+
208+
++consumedTotal;
171209
consumeOneOrTwoTuples(used);
172210
}
173211
}
174212
}
175213
}
176214
Table_.ForEachUnused([&](NJoinTable::TTuple unused) {
177215
if constexpr (Kind == EJoinKind::RightOnly) {
216+
++consumedTotal;
178217
consumeOneOrTwoTuples(unused);
179218
}
180219
if constexpr (Kind == EJoinKind::Exclusion || Kind == EJoinKind::Right ||
181220
Kind == EJoinKind::Full) {
221+
++consumedTotal;
182222
consumeOneOrTwoTuples(nullptr, unused);
183223
}
184224
});
185225
}
186-
return EFetchResult::One;
226+
return consumedTotal == 0 ? EFetchResult::Finish : EFetchResult::One;
187227
}
188228
case NYql::NUdf::EFetchStatus::Yield: {
189229
return EFetchResult::Yield;
@@ -208,4 +248,99 @@ template <typename Source, EJoinKind Kind> class TJoin : public TComputationValu
208248
NJoinTable::TStdJoinTable Table_;
209249
};
210250

251+
template <typename Source> class TJoinPackedTuples {
252+
public:
253+
using TTable = NJoinTable::TNeumannJoinTable;
254+
255+
TJoinPackedTuples(TSides<Source> sources, NUdf::TLoggerPtr logger, TString componentName,
256+
TSides<const NPackedTuple::TTupleLayout*> layouts)
257+
: Logger_(logger)
258+
, LogComponent_(logger->RegisterComponent(componentName))
259+
, Sources_(std::move(sources))
260+
, Layouts_(layouts)
261+
, Table_(Layouts_.Build)
262+
{}
263+
264+
IBlockLayoutConverter::TPackResult Flatten(std::vector<IBlockLayoutConverter::TPackResult> tuples) {
265+
IBlockLayoutConverter::TPackResult flattened;
266+
flattened.NTuples = std::accumulate(tuples.begin(), tuples.end(), i64{0},
267+
[](i64 summ, const auto& packRes) { return summ += packRes.NTuples; });
268+
269+
i64 totalTuplesSize = std::accumulate(tuples.begin(), tuples.end(), i64{0}, [](i64 summ, const auto& packRes) {
270+
return summ += std::ssize(packRes.PackedTuples);
271+
});
272+
flattened.PackedTuples.reserve(totalTuplesSize);
273+
274+
i64 totaOverflowlSize =
275+
std::accumulate(tuples.begin(), tuples.end(), i64{0},
276+
[](i64 summ, const auto& packRes) { return summ += std::ssize(packRes.Overflow); });
277+
flattened.Overflow.reserve(totaOverflowlSize);
278+
279+
int tupleSize = Layouts_.Build->TotalRowSize;
280+
for (const IBlockLayoutConverter::TPackResult& tupleBatch : tuples) {
281+
Layouts_.Build->Concat(flattened.PackedTuples, flattened.Overflow,
282+
std::ssize(flattened.PackedTuples) / tupleSize, tupleBatch.PackedTuples.data(),
283+
tupleBatch.Overflow.data(), tupleBatch.PackedTuples.size() / tupleSize,
284+
tupleBatch.Overflow.size());
285+
}
286+
return flattened;
287+
}
288+
289+
EFetchResult MatchRows([[maybe_unused]] TComputationContext& ctx,
290+
JoinMatchFun<TTable::Tuple> auto consumeOneOrTwoTuples) {
291+
while (!Sources_.Build.Finished()) {
292+
FetchResult<IBlockLayoutConverter::TPackResult> var = Sources_.Build.FetchRow();
293+
switch (AsStatus(var)) {
294+
case NYql::NUdf::EFetchStatus::Finish: {
295+
Table_.BuildWith(Flatten(BuildChunks_));
296+
break;
297+
}
298+
case NYql::NUdf::EFetchStatus::Yield: {
299+
return EFetchResult::Yield;
300+
}
301+
case NYql::NUdf::EFetchStatus::Ok: {
302+
auto& packResult = std::get<One<IBlockLayoutConverter::TPackResult>>(var);
303+
BuildChunks_.push_back(std::move(packResult.Data));
304+
break;
305+
}
306+
default:
307+
MKQL_ENSURE(false, "unreachable");
308+
}
309+
}
310+
if (Table_.Empty()) {
311+
return EFetchResult::Finish; // is it ok?
312+
}
313+
314+
if (!Sources_.Probe.Finished()) {
315+
const FetchResult<IBlockLayoutConverter::TPackResult> var = Sources_.Probe.FetchRow();
316+
const NKikimr::NMiniKQL::EFetchResult resEnum = AsResult(var);
317+
318+
if (resEnum == EFetchResult::One) {
319+
const IBlockLayoutConverter::TPackResult& thisPackResult =
320+
std::get<One<IBlockLayoutConverter::TPackResult>>(var).Data;
321+
for (int index = 0; index < thisPackResult.NTuples; ++index) {
322+
const ui8* thisRow = &thisPackResult.PackedTuples[index * Layouts_.Probe->TotalRowSize];
323+
TTable::Tuple probeRow{thisRow, thisPackResult.Overflow.data()};
324+
Table_.Lookup(probeRow, [&](TTable::Tuple matchedBuildRow) {
325+
consumeOneOrTwoTuples(TSides<TTable::Tuple>{.Build = matchedBuildRow, .Probe = probeRow});
326+
});
327+
}
328+
}
329+
330+
return resEnum;
331+
}
332+
333+
return EFetchResult::Finish;
334+
}
335+
336+
private:
337+
const NUdf::TLoggerPtr Logger_;
338+
const NUdf::TLogComponentId LogComponent_;
339+
TSides<Source> Sources_;
340+
TSides<const NPackedTuple::TTupleLayout*> Layouts_;
341+
TTable Table_;
342+
IBlockLayoutConverter::TPackResult BuildData_;
343+
std::vector<IBlockLayoutConverter::TPackResult> BuildChunks_;
344+
};
345+
211346
} // namespace NKikimr::NMiniKQL

ydb/library/yql/dq/comp_nodes/dq_scalar_hash_join.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ template <EJoinKind Kind> class TScalarHashJoinState : public TComputationValue<
6464
IComputationWideFlowNode* rightFlow, const std::vector<ui32>& leftKeyColumns,
6565
const std::vector<ui32>& rightKeyColumns, const std::vector<TType*>& leftColumnTypes,
6666
const std::vector<TType*>& rightColumnTypes, NUdf::TLoggerPtr logger, TString componentName,
67-
TDqRenames renames)
67+
TDqUserRenames renames)
6868
: NKikimr::NMiniKQL::TComputationValue<TScalarHashJoinState>(memInfo)
6969
, Join_(memInfo, TScalarRowSource{leftFlow, leftColumnTypes}, TScalarRowSource{rightFlow, rightColumnTypes},
7070
TJoinMetadata{TColumnsMetadata{rightKeyColumns, rightColumnTypes},
@@ -113,7 +113,7 @@ class TScalarHashJoinWrapper : public TStatefulWideFlowComputationNode<TScalarHa
113113
TVector<TType*>&& leftColumnTypes,
114114
TVector<ui32>&& leftKeyColumns,
115115
TVector<TType*>&& rightColumnTypes,
116-
TVector<ui32>&& rightKeyColumns, TDqRenames renames)
116+
TVector<ui32>&& rightKeyColumns, TDqUserRenames renames)
117117
: TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed)
118118
, LeftFlow_(leftFlow)
119119
, RightFlow_(rightFlow)
@@ -155,7 +155,7 @@ class TScalarHashJoinWrapper : public TStatefulWideFlowComputationNode<TScalarHa
155155
const TVector<ui32> LeftKeyColumns_;
156156
const TVector<TType*> RightColumnTypes_;
157157
const TVector<ui32> RightKeyColumns_;
158-
const TDqRenames Renames_;
158+
const TDqUserRenames Renames_;
159159
};
160160

161161
} // namespace
@@ -216,7 +216,7 @@ IComputationWideFlowNode* WrapDqScalarHashJoin(TCallable& callable, const TCompu
216216
MKQL_ENSURE(rightFlow, "Expected WideFlow as a right input");
217217
MKQL_ENSURE(joinKind == EJoinKind::Inner, "Only inner is supported, see gh#26780 for details.");
218218

219-
TDqRenames renames =
219+
TDqUserRenames renames =
220220
FromGraceFormat(TGraceJoinRenames::FromRuntimeNodes(callable.GetInput(5), callable.GetInput(6)));
221221
ValidateRenames(renames, joinKind, std::ssize(leftFlowItems), std::ssize(rightFlowItems));
222222
return new TScalarHashJoinWrapper<EJoinKind::Inner>(ctx.Mutables, leftFlow, rightFlow, std::move(joinItems),

0 commit comments

Comments
 (0)