diff --git a/ydb/core/kqp/tools/.clang-format b/ydb/core/kqp/tools/.clang-format index af073d3003c4..ead5af2f2f32 100644 --- a/ydb/core/kqp/tools/.clang-format +++ b/ydb/core/kqp/tools/.clang-format @@ -9,4 +9,7 @@ ColumnLimit: 120 AllowShortLambdasOnASingleLine: Inline AllowShortFunctionsOnASingleLine: Empty PackConstructorInitializers: Never -BreakConstructorInitializers: BeforeComma \ No newline at end of file +BreakConstructorInitializers: BeforeComma +# BinPackLongBracedList: wait for clang-format-21 +BinPackArguments: false +BinPackParameters: false \ No newline at end of file diff --git a/ydb/core/kqp/tools/join_perf/benchmark_settings.h b/ydb/core/kqp/tools/join_perf/benchmark_settings.h index 4711f7b895d4..8f105336c514 100644 --- a/ydb/core/kqp/tools/join_perf/benchmark_settings.h +++ b/ydb/core/kqp/tools/join_perf/benchmark_settings.h @@ -18,7 +18,7 @@ struct TPreset { }; struct TBenchmarkSettings { - + int Seed; TVector Presets; TSet KeyTypes; TSet Algorithms; diff --git a/ydb/core/kqp/tools/join_perf/graph.py b/ydb/core/kqp/tools/join_perf/bin/graph.py similarity index 86% rename from ydb/core/kqp/tools/join_perf/graph.py rename to ydb/core/kqp/tools/join_perf/bin/graph.py index 69006bad91d4..933ea4a31510 100644 --- a/ydb/core/kqp/tools/join_perf/graph.py +++ b/ydb/core/kqp/tools/join_perf/bin/graph.py @@ -1,4 +1,5 @@ import json +from matplotlib.ticker import FormatStrFormatter, ScalarFormatter import pandas as pd import matplotlib.pyplot as plt import sys @@ -18,11 +19,11 @@ name_parts = run_name.split('_') only_needed.append({ 'run_name': run_name, - 'time': obj["resultTime"], + 'time': int(obj["resultTime"]), 'join_algorithm': name_parts[0], 'input_data_flavour': name_parts[2], - 'left_table_size': name_parts[3], - 'right_table_size': name_parts[4], + 'left_table_size': int(name_parts[3]), + 'right_table_size': int(name_parts[4]), 'key_type': name_parts[1] } ) @@ -45,6 +46,8 @@ def geo_mean_70percent_lowest(series): log_images = images_root + "log" Path(simple_images).mkdir(parents=True, exist_ok=True) Path(log_images).mkdir(parents=True, exist_ok=True) +pd.set_option('display.max_rows', 500) + data_flovours = df['input_data_flavour'].unique() key_types = df['key_type'].unique() for data_flavour in data_flovours: @@ -56,7 +59,8 @@ def geo_mean_70percent_lowest(series): fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(10, 8), sharex=True) for name, group in subset.groupby('join_algorithm'): - group = group.groupby('left_table_size')['time'].apply(lambda x: geo_mean_70percent_lowest(x)).sort_values() + group = group.groupby('left_table_size')['time'].apply(lambda x: geo_mean_70percent_lowest(x)).sort_index() + axes.set_xticks(group.index) axes.plot( group.index, group.values, @@ -65,6 +69,7 @@ def geo_mean_70percent_lowest(series): ) axes.set_ylabel('time') axes.set_xlabel('left_rows') + axes.get_xaxis().set_major_formatter(FormatStrFormatter('%d')) axes.legend() fig.suptitle(graph_name, fontsize=16) diff --git a/ydb/core/kqp/tools/join_perf/main.cpp b/ydb/core/kqp/tools/join_perf/bin/main.cpp similarity index 81% rename from ydb/core/kqp/tools/join_perf/main.cpp rename to ydb/core/kqp/tools/join_perf/bin/main.cpp index a28795357806..a7a5a39d8372 100644 --- a/ydb/core/kqp/tools/join_perf/main.cpp +++ b/ydb/core/kqp/tools/join_perf/bin/main.cpp @@ -1,12 +1,12 @@ -#include "benchmark_settings.h" +#include -#include "ydb/core/kqp/tools/combiner_perf/fs_utils.h" +#include #include #include #include #include -#include "joins.h" +#include #include #include @@ -40,7 +40,7 @@ int main(int argc, char** argv) { int samples = 1; int scale = 1; opts.AddHelpOption().Help("visit NBenchmarkSizes namespace in benchmark_settings.cpp for explanation"); - opts.AddLongOption('c', "case") + opts.AddLongOption('p', "preset") .Help("left and right table sizes to choose for joins benchmark.") .Choices({"exp", "linear", "small"}) .DefaultValue("small") @@ -58,13 +58,14 @@ int main(int argc, char** argv) { } }(); }); - opts.AddLongOption('s', "samples").Help("number representing how much to repeat single case. useful for noise reduction.").DefaultValue(1).StoreResult(&samples); - opts.AddLongOption("scale").Help("size of smallest table in case").DefaultValue(1<<18).StoreResult(&scale); + opts.AddLongOption("samples").Help("number representing how much to repeat single case. useful for noise reduction.").DefaultValue(1).StoreResult(&samples); + opts.AddLongOption("scale").Help("size of smallest table in case").DefaultValue(1).StoreResult(&scale); + opts.AddLongOption("seed").Help("seed for keys generation").DefaultValue(123).StoreResult(¶ms.Seed); params.Algorithms = { NKikimr::NMiniKQL::ETestedJoinAlgo::kBlockMap, - // NKikimr::NMiniKQL::ETestedJoinAlgo::kBlockHash, - NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarMap, - // NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarHash, + NKikimr::NMiniKQL::ETestedJoinAlgo::kBlockHash, + // NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarMap, // slow + NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarHash, NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarGrace, }; params.KeyTypes = { diff --git a/ydb/core/kqp/tools/join_perf/bin/ya.make b/ydb/core/kqp/tools/join_perf/bin/ya.make new file mode 100644 index 000000000000..3c9ec7a48b6c --- /dev/null +++ b/ydb/core/kqp/tools/join_perf/bin/ya.make @@ -0,0 +1,27 @@ +PROGRAM(join_perf) + +YQL_LAST_ABI_VERSION() + +IF (MKQL_RUNTIME_VERSION) + CFLAGS( + -DMKQL_RUNTIME_VERSION=$MKQL_RUNTIME_VERSION + ) +ENDIF() + +PEERDIR( + ydb/core/kqp/tools/combiner_perf + ydb/core/kqp/tools/join_perf + library/cpp/lfalloc/alloc_profiler + library/cpp/dwarf_backtrace + library/cpp/dwarf_backtrace/registry + library/cpp/getopt + library/cpp/getopt/small + library/cpp/json +) + +SRCS( + main.cpp +) + +END() + diff --git a/ydb/core/kqp/tools/join_perf/construct_join_graph.cpp b/ydb/core/kqp/tools/join_perf/construct_join_graph.cpp index 955fa8cffa61..d5847b1978ad 100644 --- a/ydb/core/kqp/tools/join_perf/construct_join_graph.cpp +++ b/ydb/core/kqp/tools/join_perf/construct_join_graph.cpp @@ -1,16 +1,18 @@ #include "construct_join_graph.h" #include +#include #include #include +#include namespace NKikimr::NMiniKQL { namespace { -TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind, TRuntimeNode leftList, +TRuntimeNode BuildBlockJoin(TDqProgramBuilder& pgmBuilder, EJoinKind joinKind, TRuntimeNode leftList, TArrayRef leftKeyColumns, const TVector& leftKeyDrops, - TRuntimeNode rightList, TArrayRef rightKeyColumns, - const TVector& rightKeyDrops, bool rightAny) { + TRuntimeNode rightList, + TArrayRef rightKeyColumns, const TVector& rightKeyDrops, bool rightAny) { const auto leftStream = ToWideStream(pgmBuilder, leftList); const auto rightBlockList = ToBlockList(pgmBuilder, rightList); @@ -59,26 +61,42 @@ bool IsBlockJoin(ETestedJoinAlgo kind) { return kind == ETestedJoinAlgo::kBlockHash || kind == ETestedJoinAlgo::kBlockMap; } -THolder ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, TInnerJoinDescription descr) { +THolder ConstructJoinGraphStream(EJoinKind joinKind, ETestedJoinAlgo algo, TJoinDescription descr) { - const EJoinKind kInnerJoin = EJoinKind::Inner; const bool scalar = !IsBlockJoin(algo); TDqProgramBuilder& dqPb = descr.Setup->GetDqProgramBuilder(); TProgramBuilder& pb = static_cast(dqPb); - TVector resultTypesArr; + TVector resultTypesArr; TVector leftRenames, rightRenames; - for (ui32 idx = 0; idx < std::ssize(descr.LeftSource.ColumnTypes); ++idx) { - resultTypesArr.push_back(descr.LeftSource.ColumnTypes[idx]); - leftRenames.push_back(idx); - leftRenames.push_back(idx); + if (joinKind != EJoinKind::RightOnly && joinKind != EJoinKind::RightSemi) { + for (int colIndex = 0; colIndex < std::ssize(descr.LeftSource.ColumnTypes); ++colIndex) { + leftRenames.push_back(colIndex); + leftRenames.push_back(colIndex); + } + for (auto& resType : descr.LeftSource.ColumnTypes) { + resultTypesArr.push_back([&] { + if (ForceLeftOptional(joinKind) && !resType->IsOptional()) { + return pb.NewOptionalType(resType); + } else { + return resType; + } + }()); + } } - for (ui32 idx = 0; idx < std::ssize(descr.RightSource.ColumnTypes); ++idx) { - if (std::ranges::all_of(descr.RightSource.KeyColumnIndexes, - [idx](ui32 keyColumnIdx) { return keyColumnIdx != idx; })) { - rightRenames.push_back(idx); - rightRenames.push_back(resultTypesArr.size()); - resultTypesArr.push_back(descr.RightSource.ColumnTypes[idx]); + if (joinKind != EJoinKind::LeftOnly && joinKind != EJoinKind::LeftSemi) { + for (int colIndex = 0; colIndex < std::ssize(descr.RightSource.ColumnTypes); ++colIndex) { + rightRenames.push_back(colIndex); + rightRenames.push_back(colIndex + std::ssize(resultTypesArr)); + } + for (auto* resType : descr.LeftSource.ColumnTypes) { + resultTypesArr.push_back([&] { + if (ForceRightOptional(joinKind) && !resType->IsOptional()) { + return pb.NewOptionalType(resType); + } else { + return resType; + } + }()); } } @@ -139,7 +157,7 @@ THolder ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T case ETestedJoinAlgo::kScalarGrace: { - return dqPb.FromFlow(dqPb.GraceJoin(ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), kInnerJoin, + return dqPb.FromFlow(dqPb.GraceJoin(ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), joinKind, descr.LeftSource.KeyColumnIndexes, descr.RightSource.KeyColumnIndexes, leftRenames, rightRenames, dqPb.NewFlowType(multiResultType))); } @@ -173,7 +191,7 @@ THolder ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T std::ssize(descr.RightSource.ColumnTypes) - descr.RightSource.KeyColumnIndexes.size()); TRuntimeNode mapJoinSomething = - pb.MapJoinCore(source, rightDict, kInnerJoin, descr.LeftSource.KeyColumnIndexes, scalarMapRenames.Left, + pb.MapJoinCore(source, rightDict, joinKind, descr.LeftSource.KeyColumnIndexes, scalarMapRenames.Left, scalarMapRenames.Right, pb.NewFlowType(pb.NewTupleType(resultTypesArr))); return ToWideStream( @@ -183,21 +201,23 @@ THolder ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T } case ETestedJoinAlgo::kBlockMap: { TVector kEmptyColumnDrops; - TVector kRightDroppedColumns; - std::copy(descr.RightSource.KeyColumnIndexes.begin(), descr.RightSource.KeyColumnIndexes.end(), - std::back_inserter(kRightDroppedColumns)); - return BuildBlockJoin(pb, kInnerJoin, args.Left, descr.LeftSource.KeyColumnIndexes, kEmptyColumnDrops, - args.Right, descr.RightSource.KeyColumnIndexes, kRightDroppedColumns, false); + return BuildBlockJoin(dqPb, joinKind, args.Left, descr.LeftSource.KeyColumnIndexes, kEmptyColumnDrops, + args.Right, descr.RightSource.KeyColumnIndexes, kEmptyColumnDrops, false); } case ETestedJoinAlgo::kBlockHash: { - return dqPb.DqBlockHashJoin(ToWideStream(dqPb, args.Left), ToWideStream(dqPb, args.Right), kInnerJoin, + TVector blockResultTypes; + for (TType* type : resultTypesArr) { + blockResultTypes.push_back(pb.NewBlockType(type, TBlockType::EShape::Many)); + } + blockResultTypes.push_back(dqPb.LastScalarIndexBlock()); + return dqPb.DqBlockHashJoin(ToWideStream(dqPb, args.Left), ToWideStream(dqPb, args.Right), joinKind, descr.LeftSource.KeyColumnIndexes, descr.RightSource.KeyColumnIndexes, - pb.NewStreamType(multiResultType)); + pb.NewStreamType(pb.NewMultiType(blockResultTypes))); } case ETestedJoinAlgo::kScalarHash: { return pb.FromFlow(dqPb.DqScalarHashJoin( - ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), kInnerJoin, descr.LeftSource.KeyColumnIndexes, + ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), joinKind, descr.LeftSource.KeyColumnIndexes, descr.RightSource.KeyColumnIndexes, pb.NewFlowType(multiResultType))); } default: @@ -207,13 +227,12 @@ THolder ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T return graphFrom(wideStream); } -i32 ResultColumnCount(ETestedJoinAlgo algo, TInnerJoinDescription descr) { +i32 ResultColumnCount(ETestedJoinAlgo algo, TJoinDescription descr) { /* +1 in block case because yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp:TBlockJoinState::GetOutputWidth(); */ - return IsBlockJoin(algo) + std::ssize(descr.LeftSource.ColumnTypes) + std::ssize(descr.RightSource.ColumnTypes) - - std::ssize(descr.LeftSource.KeyColumnIndexes); + return IsBlockJoin(algo) + std::ssize(descr.LeftSource.ColumnTypes) + std::ssize(descr.RightSource.ColumnTypes); } } // namespace NKikimr::NMiniKQL \ No newline at end of file diff --git a/ydb/core/kqp/tools/join_perf/construct_join_graph.h b/ydb/core/kqp/tools/join_perf/construct_join_graph.h index aca1c9e9623d..6f56fc1a6ce2 100644 --- a/ydb/core/kqp/tools/join_perf/construct_join_graph.h +++ b/ydb/core/kqp/tools/join_perf/construct_join_graph.h @@ -2,6 +2,7 @@ #include "benchmark_settings.h" #include #include +#include namespace NKikimr::NMiniKQL { @@ -11,7 +12,7 @@ struct TJoinSourceData { NYql::NUdf::TUnboxedValue ValuesList; }; -struct TInnerJoinDescription { +struct TJoinDescription { TJoinSourceData LeftSource; TJoinSourceData RightSource; TDqSetup* Setup; @@ -19,7 +20,7 @@ struct TInnerJoinDescription { bool IsBlockJoin(ETestedJoinAlgo algo); -THolder ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, TInnerJoinDescription descr); +THolder ConstructJoinGraphStream(EJoinKind joinKind, ETestedJoinAlgo algo, TJoinDescription descr); -i32 ResultColumnCount(ETestedJoinAlgo algo, TInnerJoinDescription descr); +i32 ResultColumnCount(ETestedJoinAlgo algo, TJoinDescription descr); } // namespace NKikimr::NMiniKQL \ No newline at end of file diff --git a/ydb/core/kqp/tools/join_perf/joins.cpp b/ydb/core/kqp/tools/join_perf/joins.cpp index ad5b790d77e1..24e63056cb6b 100644 --- a/ydb/core/kqp/tools/join_perf/joins.cpp +++ b/ydb/core/kqp/tools/join_perf/joins.cpp @@ -28,14 +28,14 @@ TVector GenerateStringKeyColumn(i32 size, i32 seed) { } template -NKikimr::NMiniKQL::TInnerJoinDescription PrepareDescription(NKikimr::NMiniKQL::TDqSetup* setup, - TVector leftKeys, TVector rightKeys) { +NKikimr::NMiniKQL::TJoinDescription +PrepareDescription(NKikimr::NMiniKQL::TDqSetup* setup, TVector leftKeys, TVector rightKeys) { const int leftSize = std::ssize(leftKeys); const int rightSize = std::ssize(rightKeys); - NKikimr::NMiniKQL::TInnerJoinDescription descr; + NKikimr::NMiniKQL::TJoinDescription descr; descr.Setup = setup; - std::tie(descr.LeftSource.ColumnTypes, descr.LeftSource.ValuesList) = ConvertVectorsToRuntimeTypesAndValue( - *setup, std::move(leftKeys), TVector(leftSize, 111), TVector(leftSize, "meow")); + std::tie(descr.LeftSource.ColumnTypes, descr.LeftSource.ValuesList) = + ConvertVectorsToRuntimeTypesAndValue(*setup, std::move(leftKeys), TVector(leftSize, 111)); std::tie(descr.RightSource.ColumnTypes, descr.RightSource.ValuesList) = ConvertVectorsToRuntimeTypesAndValue(*setup, std::move(rightKeys), TVector(rightSize, "woo")); return descr; @@ -64,15 +64,16 @@ TVector NKikimr::NMiniKQL::RunJoinsBench(const TBenchmarkS for (auto keyPreset : params.Presets) { for (auto sizes : keyPreset.Cases) { NKikimr::NMiniKQL::TDqSetup setup{NKikimr::NMiniKQL::GetPerfTestFactory()}; - TInnerJoinDescription descr = [&] { + TJoinDescription descr = [&] { using enum ETestedJoinKeyType; switch (keyType) { + case kString: { - return PrepareDescription(&setup, GenerateStringKeyColumn(sizes.Left, 123), + return PrepareDescription(&setup, GenerateStringKeyColumn(sizes.Left, params.Seed), GenerateStringKeyColumn(sizes.Right, 111)); } case kInteger: { - return PrepareDescription(&setup, GenerateIntegerKeyColumn(sizes.Left, 123), + return PrepareDescription(&setup, GenerateIntegerKeyColumn(sizes.Left, params.Seed), GenerateIntegerKeyColumn(sizes.Right, 111)); } default: @@ -85,9 +86,9 @@ TVector NKikimr::NMiniKQL::RunJoinsBench(const TBenchmarkS TBenchmarkCaseResult result; result.CaseName = CaseName(algo, keyType, keyPreset, sizes); - + result.CaseName += Sprintf("_seed:_%i", params.Seed); THolder wideStreamGraph = - ConstructInnerJoinGraphStream(algo, descr); + ConstructJoinGraphStream(EJoinKind::Inner, algo, descr); NYql::NUdf::TUnboxedValue wideStream = wideStreamGraph->GetValue(); std::vector fetchBuff; ui32 cols = NKikimr::NMiniKQL::ResultColumnCount(algo, descr); @@ -106,7 +107,7 @@ TVector NKikimr::NMiniKQL::RunJoinsBench(const TBenchmarkS } result.RunDuration = TDuration::MicroSeconds(ThreadCPUTime() - timeStartMicroSeconds); - Cerr << ". Output line count: " << lineCount << Endl; + Cerr << Sprintf(". output line count: %i, time took: %ims.", lineCount, result.RunDuration.MilliSeconds()) << Endl; ret.push_back(result); } } diff --git a/ydb/core/kqp/tools/join_perf/ya.make b/ydb/core/kqp/tools/join_perf/ya.make index 720cbec0328f..02bddfc7daea 100644 --- a/ydb/core/kqp/tools/join_perf/ya.make +++ b/ydb/core/kqp/tools/join_perf/ya.make @@ -1,4 +1,4 @@ -PROGRAM(join_perf) +LIBRARY() YQL_LAST_ABI_VERSION() @@ -19,7 +19,6 @@ PEERDIR( ) SRCS( - main.cpp construct_join_graph.cpp joins.cpp benchmark_settings.cpp diff --git a/ydb/core/kqp/ut/join/kqp_block_hash_join_ut.cpp b/ydb/core/kqp/ut/join/kqp_block_hash_join_ut.cpp index e0581a4c30c5..1f30ca5d7933 100644 --- a/ydb/core/kqp/ut/join/kqp_block_hash_join_ut.cpp +++ b/ydb/core/kqp/ut/join/kqp_block_hash_join_ut.cpp @@ -78,7 +78,7 @@ Y_UNIT_TEST_SUITE(KqpBlockHashJoin) { auto resultSet = status.GetResultSets()[0]; // Current Join implementation is simple and returns all the rows - auto expectedRowsCount = UseBlockHashJoin ? 6 : 3; + auto expectedRowsCount = 3; UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), expectedRowsCount); auto explainResult = queryClient.ExecuteQuery( diff --git a/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp b/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp index e754cbf5a11f..f78ce29cb1f4 100644 --- a/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp +++ b/ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp @@ -1,37 +1,93 @@ #include "dq_block_hash_join.h" +#include #include #include #include #include -#include #include +#include #include -#include #include #include +#include "dq_join_common.h" + namespace NKikimr::NMiniKQL { namespace { -class TBlockHashJoinWrapper : public TMutableComputationNode { -private: +class TBlockRowSource : public NNonCopyable::TMoveOnly { + public: + TBlockRowSource(TComputationContext& ctx, IComputationNode* stream, const std::vector& types) + : Stream_(stream) + , Values_(Stream_->GetValue(ctx)) + , Buff_(types.size()) + { + TTypeInfoHelper typeInfoHelper; + for (int index = 0; index < std::ssize(InputReaders); ++index) { + InputReaders[index] = NYql::NUdf::MakeBlockReader(typeInfoHelper, types[index]); + InputItemConverters[index] = + MakeBlockItemConverter(typeInfoHelper, types[index], ctx.Builder->GetPgBuilder()); + } + } + + bool Finished() const { + return Finished_; + } + + int UserDataSize() const { + return Buff_.size() - 1; + } + + NYql::NUdf::EFetchStatus ForEachRow(TComputationContext& ctx, std::invocable auto consume) { + auto res = Values_.WideFetch(Buff_.data(), Buff_.size()); + if (res != NYql::NUdf::EFetchStatus::Ok) { + if (res == NYql::NUdf::EFetchStatus::Finish) { + Finished_ = true; + } + return res; + } + const int cols = UserDataSize(); + + for (int index = 0; index < cols; ++index) { + Blocks_[index] = &TArrowBlock::From(Buff_[index]).GetDatum(); + } + + const int rows = ArrowScalarAsInt(TArrowBlock::From(Buff_.back())); + + for (int rowIndex = 0; rowIndex < rows; ++rowIndex) { + for (int colIndex = 0; colIndex < cols; ++colIndex) { + ConsumeBuff_[colIndex] = InputItemConverters[colIndex]->MakeValue( + InputReaders[colIndex]->GetItem(*Blocks_[colIndex]->array(), rowIndex), ctx.HolderFactory); + } + consume(ConsumeBuff_.data()); + } + return NYql::NUdf::EFetchStatus::Ok; + } + + private: + bool Finished_ = false; + IComputationNode* Stream_; + NYql::NUdf::TUnboxedValue Values_; + TUnboxedValueVector Buff_; + TUnboxedValueVector ConsumeBuff_{Buff_.size() - 1}; + std::vector Blocks_{Buff_.size() - 1}; + std::vector> InputReaders{Buff_.size() - 1}; + std::vector> InputItemConverters{Buff_.size() - 1}; +}; + +template class TBlockHashJoinWrapper : public TMutableComputationNode> { + private: using TBaseComputation = TMutableComputationNode; -public: - TBlockHashJoinWrapper( - TComputationMutables& mutables, - const TVector&& resultItemTypes, - const TVector&& leftItemTypes, - const TVector&& leftKeyColumns, - const TVector&& rightItemTypes, - const TVector&& rightKeyColumns, - IComputationNode* leftStream, - IComputationNode* rightStream - ) + public: + TBlockHashJoinWrapper(TComputationMutables& mutables, const TVector&& resultItemTypes, + const TVector&& leftItemTypes, const TVector&& leftKeyColumns, + const TVector&& rightItemTypes, const TVector&& rightKeyColumns, + IComputationNode* leftStream, IComputationNode* rightStream) : TBaseComputation(mutables, EValueRepresentation::Boxed) , ResultItemTypes_(std::move(resultItemTypes)) , LeftItemTypes_(std::move(leftItemTypes)) @@ -43,170 +99,128 @@ class TBlockHashJoinWrapper : public TMutableComputationNode( - ctx.HolderFactory, - LeftKeyColumns_, - RightKeyColumns_, - std::move(LeftStream_->GetValue(ctx)), - std::move(RightStream_->GetValue(ctx)), - ResultItemTypes_, - LeftItemTypes_.size(), // Left stream width - RightItemTypes_.size() // Right stream width - ); + return ctx.HolderFactory.Create(ctx, LeftKeyColumns_, RightKeyColumns_, LeftStream_, RightStream_, + LeftItemTypes_, RightItemTypes_, ResultItemTypes_); } -private: + private: class TStreamValue : public TComputationValue { using TBase = TComputationValue; - public: - TStreamValue( - TMemoryUsageInfo* memInfo, - const THolderFactory& holderFactory, - const TVector& leftKeyColumns, - const TVector& rightKeyColumns, - NUdf::TUnboxedValue&& leftStream, - NUdf::TUnboxedValue&& rightStream, - const TVector& resultItemTypes, - size_t leftStreamWidth, - size_t rightStreamWidth - ) + public: + TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector& leftKeyColumns, + const TVector& rightKeyColumns, IComputationNode* leftStream, IComputationNode* rightStream, + const TVector& leftStreamTypes, const TVector& rightStreamTypes, + const TVector& resultStreamTypes) : TBase(memInfo) - , LeftKeyColumns_(leftKeyColumns) - , RightKeyColumns_(rightKeyColumns) - , LeftStream_(std::move(leftStream)) - , RightStream_(std::move(rightStream)) - , HolderFactory_(holderFactory) - , ResultItemTypes_(resultItemTypes) - , LeftStreamWidth_(leftStreamWidth) - , RightStreamWidth_(rightStreamWidth) - { } - - private: - NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) override { - Y_DEBUG_ABORT_UNLESS(width == ResultItemTypes_.size()); - - Cerr << "WideFetch called: width=" << width - << " leftWidth=" << LeftStreamWidth_ - << " rightWidth=" << RightStreamWidth_ - << " leftFinished=" << LeftFinished_ - << " rightFinished=" << RightFinished_ << Endl; - - - if (!LeftFinished_) { - TVector leftInput(LeftStreamWidth_); - Cerr << "Trying to read left stream with width " << LeftStreamWidth_ << Endl; - auto status = LeftStream_.WideFetch(leftInput.data(), LeftStreamWidth_); - Cerr << "Left stream status: " << (int)status << Endl; - - switch (status) { - case NUdf::EFetchStatus::Ok: { - Cerr << "Left stream read successful!" << Endl; - - size_t dataCols = std::min(static_cast(width), LeftStreamWidth_) - 1; - for (size_t i = 0; i < dataCols; i++) { - Cerr << "Copying leftInput[" << i << "] IsBoxed=" << leftInput[i].IsBoxed() - << " IsSpecial=" << leftInput[i].IsSpecial() - << " IsInvalid=" << leftInput[i].IsInvalid() << Endl; - output[i] = std::move(leftInput[i]); - Cerr << "Successfully copied leftInput[" << i << "]" << Endl; - } - - if (width > 0) { - size_t blockLengthSrcIdx = LeftStreamWidth_ - 1; - size_t blockLengthDstIdx = width - 1; - Cerr << "Copying block length from leftInput[" << blockLengthSrcIdx << "] to output[" << blockLengthDstIdx << "] IsBoxed=" - << leftInput[blockLengthSrcIdx].IsBoxed() - << " IsEmpty=" << !leftInput[blockLengthSrcIdx] - << " IsEmbedded=" << leftInput[blockLengthSrcIdx].IsEmbedded() << Endl; - - output[blockLengthDstIdx] = std::move(leftInput[blockLengthSrcIdx]); - } - - for (size_t i = dataCols; i < width - 1; i++) { - Cerr << "Creating empty array for output[" << i << "]" << Endl; - auto blockItemType = AS_TYPE(TBlockType, ResultItemTypes_[i])->GetItemType(); - std::shared_ptr arrowType; - MKQL_ENSURE(ConvertArrowType(blockItemType, arrowType), "Failed to convert type to arrow"); - auto emptyArray = arrow::MakeArrayOfNull(arrowType, 0); - ARROW_OK(emptyArray.status()); - output[i] = HolderFactory_.CreateArrowBlock(arrow::Datum(emptyArray.ValueOrDie())); - } - return NUdf::EFetchStatus::Ok; - } - - case NUdf::EFetchStatus::Yield: - return NUdf::EFetchStatus::Yield; - - case NUdf::EFetchStatus::Finish: - Cerr << "Left stream finished!" << Endl; - LeftFinished_ = true; - break; + , Join_(memInfo, TBlockRowSource{ctx, leftStream, leftStreamTypes}, + TBlockRowSource{ctx, rightStream, rightStreamTypes}, + TJoinMetadata{TColumnsMetadata{rightKeyColumns, rightStreamTypes}, + TColumnsMetadata{leftKeyColumns, leftStreamTypes}, + KeyTypesFromColumns(leftStreamTypes, leftKeyColumns)}, ctx.MakeLogger(), + "BlockHashJoin") + , Ctx_(&ctx) + , OutputTypes_(resultStreamTypes) + { + TTypeInfoHelper typeInfoHelper; + for (auto outType : OutputTypes_) { + OutputItemConverters_.push_back( + MakeBlockItemConverter(typeInfoHelper, outType, ctx.Builder->GetPgBuilder())); + } + } + + int TupleSize() const { + return OutputTypes_.size() - 1 /*mind last integer column*/; + } + + int SizeTuples() const { + MKQL_ENSURE(OutputBuffer_.size() % TupleSize() == 0, "buffer contains tuple parts??"); + return OutputBuffer_.size() / TupleSize(); + } + + void FlushTo(NUdf::TUnboxedValue* output) { + MKQL_ENSURE(!OutputBuffer_.empty(), "make sure we are flushing something, not empty set of tuples"); + TTypeInfoHelper helper; + std::vector> blockBuilders; + int rows = SizeTuples(); + for (int i = 0; i < TupleSize(); ++i) { + blockBuilders.push_back(MakeArrayBuilder(helper, OutputTypes_[i], Ctx_->ArrowMemoryPool, rows, + &Ctx_->Builder->GetPgBuilder())); + } + + for (int rowIndex = 0; rowIndex < rows; ++rowIndex) { + for (int colIndex = 0; colIndex < TupleSize(); ++colIndex) { + int valueIndex = colIndex + rowIndex * TupleSize(); + blockBuilders[colIndex]->Add(OutputItemConverters_[colIndex]->MakeItem(OutputBuffer_[valueIndex])); } } - - if (!RightFinished_) { - TVector rightInput(RightStreamWidth_); - auto status = RightStream_.WideFetch(rightInput.data(), RightStreamWidth_); - - switch (status) { - case NUdf::EFetchStatus::Ok: { - Cerr << "Right stream read successful!" << Endl; - size_t dataCols = std::min(static_cast(width), RightStreamWidth_) - 1; - for (size_t i = 0; i < dataCols; i++) { - Cerr << "Copying rightInput[" << i << "] IsBoxed=" << rightInput[i].IsBoxed() << Endl; - output[i] = std::move(rightInput[i]); - } - - if (width > 0) { - size_t blockLengthSrcIdx = RightStreamWidth_ - 1; - size_t blockLengthDstIdx = width - 1; - Cerr << "Copying block length from rightInput[" << blockLengthSrcIdx << "] to output[" << blockLengthDstIdx << "] IsBoxed=" - << rightInput[blockLengthSrcIdx].IsBoxed() - << " IsEmpty=" << !rightInput[blockLengthSrcIdx] - << " IsEmbedded=" << rightInput[blockLengthSrcIdx].IsEmbedded() << Endl; - - output[blockLengthDstIdx] = std::move(rightInput[blockLengthSrcIdx]); - } - - for (size_t i = dataCols; i < width - 1; i++) { - Cerr << "Creating empty array for output[" << i << "]" << Endl; - auto blockItemType = AS_TYPE(TBlockType, ResultItemTypes_[i])->GetItemType(); - std::shared_ptr arrowType; - MKQL_ENSURE(ConvertArrowType(blockItemType, arrowType), "Failed to convert type to arrow"); - auto emptyArray = arrow::MakeArrayOfNull(arrowType, 0); - ARROW_OK(emptyArray.status()); - output[i] = HolderFactory_.CreateArrowBlock(arrow::Datum(emptyArray.ValueOrDie())); + OutputBuffer_.clear(); + for (int colIndex = 0; colIndex < TupleSize(); ++colIndex) { + output[colIndex] = Ctx_->HolderFactory.CreateArrowBlock(blockBuilders[colIndex]->Build(true)); + } + output[TupleSize()] = Ctx_->HolderFactory.CreateArrowBlock(arrow::Datum(static_cast(rows))); + MKQL_ENSURE(OutputBuffer_.empty(), "something left after flush??"); + } + + private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) override { + MKQL_ENSURE(width == OutputTypes_.size(), + Sprintf("runtime(%i) vs compile-time(%i) tuple width mismatch", width, OutputTypes_.size())); + if (Finished_) { + return NYql::NUdf::EFetchStatus::Finish; + } + auto out = std::back_inserter(OutputBuffer_); + auto consumeOneOrTwo = [&] { + if constexpr (SemiOrOnlyJoin(Kind)) { + return [&](NJoinTable::TTuple tuple) { + MKQL_ENSURE(tuple != nullptr, "null output row in semi/only join?"); + std::copy_n(tuple, Join_.ProbeSize(), out); + }; + } else { + return [&](NJoinTable::TTuple probe, NJoinTable::TTuple build) { + if (!probe) { // todo: remove nullptr checks for some join types. + probe = NullTuples_.data(); + } + std::copy_n(probe, Join_.ProbeSize(), out); + + if (!build) { + build = NullTuples_.data(); + } + std::copy_n(build, Join_.BuildSize(), out); + }; + } + }(); + while (SizeTuples() < Threshold) { + auto res = Join_.MatchRows(*Ctx_, consumeOneOrTwo); + switch (res) { + case EFetchResult::Finish: { + if (SizeTuples() == 0) { + return NYql::NUdf::EFetchStatus::Finish; } - return NUdf::EFetchStatus::Ok; + FlushTo(output); + Finished_ = true; + return NYql::NUdf::EFetchStatus::Ok; } - - case NUdf::EFetchStatus::Yield: - return NUdf::EFetchStatus::Yield; - - case NUdf::EFetchStatus::Finish: - Cerr << "Right stream finished!" << Endl; - RightFinished_ = true; + case EFetchResult::Yield: + return NYql::NUdf::EFetchStatus::Yield; + case EFetchResult::One: break; } } - - Cerr << "Both streams finished, returning Finish" << Endl; - return NUdf::EFetchStatus::Finish; + FlushTo(output); + return NUdf::EFetchStatus::Ok; } - private: - bool LeftFinished_ = false; - bool RightFinished_ = false; - - [[maybe_unused]] const TVector& LeftKeyColumns_; - [[maybe_unused]] const TVector& RightKeyColumns_; - NUdf::TUnboxedValue LeftStream_; - NUdf::TUnboxedValue RightStream_; - const THolderFactory& HolderFactory_; - const TVector& ResultItemTypes_; - const size_t LeftStreamWidth_; - const size_t RightStreamWidth_; + private: + TJoin Join_; + TComputationContext* Ctx_; + std::vector OutputBuffer_; + std::vector> OutputItemConverters_; + const std::vector OutputTypes_; + const int Threshold = 10000; + bool Finished_ = false; + const std::vector NullTuples_{ + static_cast(std::max(Join_.BuildSize(), Join_.ProbeSize())), NYql::NUdf::TUnboxedValuePod{}}; }; void RegisterDependencies() const final { @@ -214,14 +228,14 @@ class TBlockHashJoinWrapper : public TMutableComputationNodeDependsOn(RightStream_); } -private: - const TVector ResultItemTypes_; - const TVector LeftItemTypes_; - const TVector LeftKeyColumns_; - const TVector RightItemTypes_; - const TVector RightKeyColumns_; - IComputationNode* LeftStream_; - IComputationNode* RightStream_; + private: + const TVector ResultItemTypes_; + const TVector LeftItemTypes_; + const TVector LeftKeyColumns_; + const TVector RightItemTypes_; + const TVector RightKeyColumns_; + IComputationNode* LeftStream_; + IComputationNode* RightStream_; }; } // namespace @@ -232,35 +246,43 @@ IComputationNode* WrapDqBlockHashJoin(TCallable& callable, const TComputationNod const auto joinType = callable.GetType()->GetReturnType(); MKQL_ENSURE(joinType->IsStream(), "Expected WideStream as a resulting stream"); const auto joinStreamType = AS_TYPE(TStreamType, joinType); - MKQL_ENSURE(joinStreamType->GetItemType()->IsMulti(), - "Expected Multi as a resulting item type"); + MKQL_ENSURE(joinStreamType->GetItemType()->IsMulti(), "Expected Multi as a resulting item type"); const auto joinComponents = GetWideComponents(joinStreamType); MKQL_ENSURE(joinComponents.size() > 0, "Expected at least one column"); - const TVector joinItems(joinComponents.cbegin(), joinComponents.cend()); + TVector joinItems; + for (auto* blockType : joinComponents) { + MKQL_ENSURE(blockType->IsBlock(), "Expected block types as wide components of result stream"); + joinItems.push_back(AS_TYPE(TBlockType, blockType)->GetItemType()); + } const auto leftType = callable.GetInput(0).GetStaticType(); MKQL_ENSURE(leftType->IsStream(), "Expected WideStream as a left stream"); const auto leftStreamType = AS_TYPE(TStreamType, leftType); - MKQL_ENSURE(leftStreamType->GetItemType()->IsMulti(), - "Expected Multi as a left stream item type"); + MKQL_ENSURE(leftStreamType->GetItemType()->IsMulti(), "Expected Multi as a left stream item type"); const auto leftStreamComponents = GetWideComponents(leftStreamType); MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column"); - const TVector leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend()); + TVector leftStreamItems; + for (auto* blockType : leftStreamComponents) { + MKQL_ENSURE(blockType->IsBlock(), "Expected block types as wide components of left stream"); + leftStreamItems.push_back(AS_TYPE(TBlockType, blockType)->GetItemType()); + } const auto rightType = callable.GetInput(1).GetStaticType(); MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream"); const auto rightStreamType = AS_TYPE(TStreamType, rightType); - MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(), - "Expected Multi as a right stream item type"); + MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(), "Expected Multi as a right stream item type"); const auto rightStreamComponents = GetWideComponents(rightStreamType); MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column"); - const TVector rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend()); + TVector rightStreamItems; + for (auto* blockType : rightStreamComponents) { + MKQL_ENSURE(blockType->IsBlock(), "Expected block types as wide components of right stream"); + rightStreamItems.push_back(AS_TYPE(TBlockType, blockType)->GetItemType()); + } const auto joinKindNode = callable.GetInput(2); const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get(); const auto joinKind = GetJoinKind(rawKind); - MKQL_ENSURE(joinKind == EJoinKind::Inner, - "Only inner join is supported in block hash join prototype"); + MKQL_ENSURE(joinKind != EJoinKind::Cross, "Only inner join is supported in block hash join prototype"); const auto leftKeyColumnsLiteral = callable.GetInput(3); const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral); @@ -284,19 +306,11 @@ IComputationNode* WrapDqBlockHashJoin(TCallable& callable, const TComputationNod const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0); const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1); - - return new TBlockHashJoinWrapper( - ctx.Mutables, - std::move(joinItems), - std::move(leftStreamItems), - std::move(leftKeyColumns), - std::move(rightStreamItems), - std::move(rightKeyColumns), - leftStream, - rightStream - ); + return std::visit([&](auto kind) -> IComputationNode* { + return new TBlockHashJoinWrapper( + ctx.Mutables, std::move(joinItems), std::move(leftStreamItems), std::move(leftKeyColumns), + std::move(rightStreamItems), std::move(rightKeyColumns), leftStream, rightStream); + }, TypifyJoinKind(joinKind)); } } // namespace NKikimr::NMiniKQL - - diff --git a/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h b/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h index dbe71bcd35de..818a30c23996 100644 --- a/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h +++ b/ydb/library/yql/dq/comp_nodes/dq_hash_join_table.h @@ -1,10 +1,12 @@ #pragma once #include "type_utils.h" +#include #include namespace NKikimr::NMiniKQL::NJoinTable { using TTuple = const NYql::NUdf::TUnboxedValue*; +using TSizedTuple = std::span; bool NeedToTrackUnusedRightTuples(EJoinKind kind); @@ -15,16 +17,18 @@ class TStdJoinTable { std::vector Tuples; bool Used; }; + public: - TStdJoinTable(int tupleSize, NKikimr::NMiniKQL::TWideUnboxedEqual eq, NKikimr::NMiniKQL::TWideUnboxedHasher hash, bool trackUnusedTuples) + TStdJoinTable(int tupleSize, NKikimr::NMiniKQL::TWideUnboxedEqual eq, NKikimr::NMiniKQL::TWideUnboxedHasher hash, + bool trackUnusedTuples) : TupleSize(tupleSize) , TrackUnusedTuples(trackUnusedTuples) , BuiltTable(1, hash, eq) {} - void Add(std::span tuple) { + void Add(TSizedTuple tuple) { MKQL_ENSURE(BuiltTable.empty(), "JoinTable is built already"); - MKQL_ENSURE(std::ssize(tuple) == TupleSize, "tuple size promise vs actual mismatch"); + MKQL_ENSURE(std::ssize(tuple) == TupleSize, Sprintf("tuple size promise(%i) vs actual(%i) mismatch", TupleSize, std::ssize(tuple))); for (int idx = 0; idx < TupleSize; ++idx) { Tuples.push_back(tuple[idx]); } @@ -34,14 +38,15 @@ class TStdJoinTable { MKQL_ENSURE(BuiltTable.empty(), "JoinTable is built already"); for (int index = 0; index < std::ssize(Tuples); index += TupleSize) { TTuple thisTuple = &Tuples[index]; - auto [it, ok] = BuiltTable.emplace(thisTuple, TuplesWithSameJoinKey{.Tuples = std::vector{thisTuple}, .Used = !TrackUnusedTuples}); + auto [it, ok] = BuiltTable.emplace( + thisTuple, TuplesWithSameJoinKey{.Tuples = std::vector{thisTuple}, .Used = !TrackUnusedTuples}); if (!ok) { it->second.Tuples.emplace_back(thisTuple); } } } - void Lookup(TTuple key, std::function produce) { + void Lookup(TTuple key, std::invocable auto produce) { auto it = BuiltTable.find(key); if (it != BuiltTable.end()) { it->second.Used = true; @@ -49,16 +54,17 @@ class TStdJoinTable { } } - bool UnusedTrackingOn() const { + bool UnusedTrackingOn() const { return TrackUnusedTuples; } const auto& MapView() const { return BuiltTable; } + void ForEachUnused(std::function produce) { MKQL_ENSURE(TrackUnusedTuples, "wasn't tracking tuples at all"); - for(auto& tuplesSameKey: BuiltTable) { + for (auto& tuplesSameKey : BuiltTable) { if (!tuplesSameKey.second.Used) { std::ranges::for_each(tuplesSameKey.second.Tuples, produce); tuplesSameKey.second.Used = true; diff --git a/ydb/library/yql/dq/comp_nodes/dq_join_common.cpp b/ydb/library/yql/dq/comp_nodes/dq_join_common.cpp new file mode 100644 index 000000000000..732121d8114a --- /dev/null +++ b/ydb/library/yql/dq/comp_nodes/dq_join_common.cpp @@ -0,0 +1,39 @@ +#include "dq_join_common.h" +#include +namespace NKikimr::NMiniKQL { + +TKeyTypes KeyTypesFromColumns(const std::vector& types, const std::vector& keyIndexes) { + TKeyTypes kt; + std::ranges::copy(keyIndexes | std::views::transform([&types](ui32 typeIndex) { + const TType* type = types[typeIndex]; + MKQL_ENSURE(type->IsData(), "exepected data type"); + return std::pair{*AS_TYPE(TDataType, type)->GetDataSlot(), false}; + }), std::back_inserter(kt)); + return kt; +} + +TTypedJoinKind TypifyJoinKind(EJoinKind kind) { + switch (kind) { + case EJoinKind::LeftOnly: + return TJoinKindTag{}; + case EJoinKind::Inner: + return TJoinKindTag{}; + case EJoinKind::RightOnly: + return TJoinKindTag{}; + case EJoinKind::Left: + return TJoinKindTag{}; + case EJoinKind::Right: + return TJoinKindTag{}; + case EJoinKind::Exclusion: + return TJoinKindTag{}; + case EJoinKind::Full: + return TJoinKindTag{}; + case EJoinKind::LeftSemi: + return TJoinKindTag{}; + case EJoinKind::RightSemi: + return TJoinKindTag{}; + default: + MKQL_ENSURE(false, "unsupported join kind"); + } +} +} // namespace NKikimr::NMiniKQL \ No newline at end of file diff --git a/ydb/library/yql/dq/comp_nodes/dq_join_common.h b/ydb/library/yql/dq/comp_nodes/dq_join_common.h new file mode 100644 index 000000000000..57d020d2b8ec --- /dev/null +++ b/ydb/library/yql/dq/comp_nodes/dq_join_common.h @@ -0,0 +1,193 @@ +#pragma once +#include "dq_hash_join_table.h" +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NMiniKQL { +struct TColumnsMetadata { + std::vector KeyColumns; + std::vector ColumnTypes; +}; + +struct TJoinMetadata { + TColumnsMetadata Build; + TColumnsMetadata Probe; + TKeyTypes KeyTypes; +}; + +template struct TJoinKindTag { + static constexpr EJoinKind Kind_ = Kind; +}; + +using TTypedJoinKind = + std::variant, + TJoinKindTag, + TJoinKindTag, + TJoinKindTag, + TJoinKindTag, + TJoinKindTag, + TJoinKindTag, + TJoinKindTag, + TJoinKindTag>; + +TTypedJoinKind TypifyJoinKind(EJoinKind kind); + +TKeyTypes KeyTypesFromColumns(const std::vector& types, const std::vector& keyIndexes); + +constexpr bool SemiOrOnlyJoin(EJoinKind kind) { + switch (kind) { + using enum EJoinKind; + case RightOnly: + case RightSemi: + case LeftOnly: + case LeftSemi: + return true; + default: + return false; + } +} + +constexpr bool ContainsRowsFromInnerJoin(EJoinKind kind) { // true if kind is a join that contains all rows from inner join output. + switch (kind) { + using enum EJoinKind; + case Inner: + case Full: + case Left: + case Right: + case Cross: + return true; + default: + return false; + } +} + +// Some joins produce concatenation of 2 tuples, some produce one tuple(effectively) +template +concept JoinMatchFun = std::invocable || std::invocable; + +template class TJoin : public TComputationValue> { + using TBase = TComputationValue; + + public: + TJoin(TMemoryUsageInfo* memInfo, Source probe, Source build, TJoinMetadata meta, NUdf::TLoggerPtr logger, + TString componentName) + : TBase(memInfo) + , Meta_(meta) + , Logger_(logger) + , LogComponent_(logger->RegisterComponent(componentName)) + , Build_(std::move(build)) + , Probe_(std::move(probe)) + , Table_(BuildSize(), TWideUnboxedEqual{Meta_.KeyTypes}, TWideUnboxedHasher{Meta_.KeyTypes}, + NJoinTable::NeedToTrackUnusedRightTuples(Kind)) + { + MKQL_ENSURE(BuildSize() == ProbeSize(), "unimplemented"); + MKQL_ENSURE(Kind != EJoinKind::Cross, "Unsupported join kind"); + UDF_LOG(Logger_, LogComponent_, NUdf::ELogLevel::Debug, "TScalarHashJoinState created"); + } + + const TJoinMetadata& Meta() const { + return Meta_; + } + + int ProbeSize() const { + return Probe_.UserDataSize(); + } + + int BuildSize() const { + return Build_.UserDataSize(); + } + + + EFetchResult MatchRows(TComputationContext& ctx, JoinMatchFun auto consumeOneOrTwoTuples) { + while (!Build_.Finished()) { + auto res = Build_.ForEachRow(ctx, [&](auto tuple) { Table_.Add({tuple, tuple + Build_.UserDataSize()}); }); + switch (res) { + case NYql::NUdf::EFetchStatus::Finish: { + Table_.Build(); + break; + } + case NYql::NUdf::EFetchStatus::Yield: { + return EFetchResult::Yield; + } + case NYql::NUdf::EFetchStatus::Ok: { + break; + } + default: + MKQL_ENSURE(false, "unreachable"); + } + } + if (!Probe_.Finished()) { + auto result = Probe_.ForEachRow(ctx, [&](NJoinTable::TTuple probeTuple) { + bool found = false; + Table_.Lookup(probeTuple, [&](NJoinTable::TTuple matchedBuildTuple) { + if constexpr (ContainsRowsFromInnerJoin(Kind)) { + consumeOneOrTwoTuples(probeTuple, matchedBuildTuple); + } + found = true; + }); + if (!found) { + if constexpr (Kind == EJoinKind::Exclusion || Kind == EJoinKind::Left || Kind == EJoinKind::Full) { + consumeOneOrTwoTuples(probeTuple, nullptr); + } + if constexpr (Kind == EJoinKind::LeftOnly) { + consumeOneOrTwoTuples(probeTuple); + } + } + if constexpr (Kind == EJoinKind::LeftSemi) { + if (found) { + consumeOneOrTwoTuples(probeTuple); + } + } + }); + switch (result) { + case NYql::NUdf::EFetchStatus::Finish: { + if (Table_.UnusedTrackingOn()) { + if constexpr (Kind == EJoinKind::RightSemi) { + for (auto& v : Table_.MapView()) { + if (v.second.Used) { + for (NJoinTable::TTuple used : v.second.Tuples) { + consumeOneOrTwoTuples(used); + } + } + } + } + Table_.ForEachUnused([&](NJoinTable::TTuple unused) { + if constexpr (Kind == EJoinKind::RightOnly) { + consumeOneOrTwoTuples(unused); + } + if constexpr (Kind == EJoinKind::Exclusion || Kind == EJoinKind::Right || + Kind == EJoinKind::Full) { + consumeOneOrTwoTuples(nullptr, unused); + } + }); + } + return EFetchResult::One; + } + case NYql::NUdf::EFetchStatus::Yield: { + return EFetchResult::Yield; + } + case NYql::NUdf::EFetchStatus::Ok: { + return EFetchResult::One; + } + default: + MKQL_ENSURE(false, "unreachable"); + } + } + return EFetchResult::Finish; + } + + private: + const TJoinMetadata Meta_; + const NUdf::TLoggerPtr Logger_; + const NUdf::TLogComponentId LogComponent_; + + Source Build_; + Source Probe_; + NJoinTable::TStdJoinTable Table_; +}; + +} // namespace NKikimr::NMiniKQL \ No newline at end of file diff --git a/ydb/library/yql/dq/comp_nodes/dq_program_builder.cpp b/ydb/library/yql/dq/comp_nodes/dq_program_builder.cpp index ea000d383565..f623924a37c7 100644 --- a/ydb/library/yql/dq/comp_nodes/dq_program_builder.cpp +++ b/ydb/library/yql/dq/comp_nodes/dq_program_builder.cpp @@ -7,18 +7,14 @@ namespace NKikimr { namespace NMiniKQL { - TDqProgramBuilder::TDqProgramBuilder(const TTypeEnvironment& env, const IFunctionRegistry& functionRegistry) - : TProgramBuilder(env, functionRegistry) {} + : TProgramBuilder(env, functionRegistry) +{} TCallableBuilder TDqProgramBuilder::BuildCommonCombinerParams( - const TStringBuf operatorName, - const TRuntimeNode operatorParams, - const TRuntimeNode flowOrStream, - const TProgramBuilder::TWideLambda& keyExtractor, - const TProgramBuilder::TBinaryWideLambda& init, - const TProgramBuilder::TTernaryWideLambda& update, - const TProgramBuilder::TBinaryWideLambda& finish) + const TStringBuf operatorName, const TRuntimeNode operatorParams, const TRuntimeNode flowOrStream, + const TProgramBuilder::TWideLambda& keyExtractor, const TProgramBuilder::TBinaryWideLambda& init, + const TProgramBuilder::TTernaryWideLambda& update, const TProgramBuilder::TBinaryWideLambda& finish) { const auto wideComponents = GetWideComponents(flowOrStream.GetStaticType()); const bool isFlow = flowOrStream.GetStaticType()->IsFlow(); @@ -33,36 +29,42 @@ TCallableBuilder TDqProgramBuilder::BuildCommonCombinerParams( itemArgs.reserve(unblockedWideComponents.size()); auto i = 0U; - std::generate_n(std::back_inserter(itemArgs), unblockedWideComponents.size(), [&](){ return Arg(unblockedWideComponents[i++]); }); + std::generate_n(std::back_inserter(itemArgs), unblockedWideComponents.size(), + [&]() { return Arg(unblockedWideComponents[i++]); }); const auto keys = keyExtractor(itemArgs); TRuntimeNode::TList keyArgs; keyArgs.reserve(keys.size()); - std::transform(keys.cbegin(), keys.cend(), std::back_inserter(keyArgs), [&](TRuntimeNode key){ return Arg(key.GetStaticType()); } ); + std::transform(keys.cbegin(), keys.cend(), std::back_inserter(keyArgs), + [&](TRuntimeNode key) { return Arg(key.GetStaticType()); }); const auto first = init(keyArgs, itemArgs); TRuntimeNode::TList stateArgs; stateArgs.reserve(first.size()); - std::transform(first.cbegin(), first.cend(), std::back_inserter(stateArgs), [&](TRuntimeNode state){ return Arg(state.GetStaticType()); } ); + std::transform(first.cbegin(), first.cend(), std::back_inserter(stateArgs), + [&](TRuntimeNode state) { return Arg(state.GetStaticType()); }); const auto next = update(keyArgs, itemArgs, stateArgs); MKQL_ENSURE(next.size() == first.size(), "Mismatch init and update state size."); TRuntimeNode::TList finishKeyArgs; finishKeyArgs.reserve(keys.size()); - std::transform(keys.cbegin(), keys.cend(), std::back_inserter(finishKeyArgs), [&](TRuntimeNode key){ return Arg(key.GetStaticType()); } ); + std::transform(keys.cbegin(), keys.cend(), std::back_inserter(finishKeyArgs), + [&](TRuntimeNode key) { return Arg(key.GetStaticType()); }); TRuntimeNode::TList finishStateArgs; finishStateArgs.reserve(next.size()); - std::transform(next.cbegin(), next.cend(), std::back_inserter(finishStateArgs), [&](TRuntimeNode state){ return Arg(state.GetStaticType()); } ); + std::transform(next.cbegin(), next.cend(), std::back_inserter(finishStateArgs), + [&](TRuntimeNode state) { return Arg(state.GetStaticType()); }); const auto output = finish(finishKeyArgs, finishStateArgs); std::vector outputWideComponents; outputWideComponents.reserve(output.size() + hasBlocks ? 1 : 0); - std::transform(output.cbegin(), output.cend(), std::back_inserter(outputWideComponents), std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1)); + std::transform(output.cbegin(), output.cend(), std::back_inserter(outputWideComponents), + std::bind(&TRuntimeNode::GetStaticType, std::placeholders::_1)); if (hasBlocks) { WrapArrayBlockTypes(outputWideComponents, *this); auto blockSizeType = NewDataType(NUdf::TDataType::Id); @@ -89,46 +91,39 @@ TCallableBuilder TDqProgramBuilder::BuildCommonCombinerParams( return callableBuilder; } -TRuntimeNode TDqProgramBuilder::DqHashCombine(TRuntimeNode flow, ui64 memLimit, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish) +TRuntimeNode TDqProgramBuilder::DqHashCombine(TRuntimeNode flow, ui64 memLimit, const TWideLambda& keyExtractor, + const TBinaryWideLambda& init, const TTernaryWideLambda& update, + const TBinaryWideLambda& finish) { TRuntimeNode::TList operatorParamsList; operatorParamsList.push_back(NewDataLiteral(memLimit)); TRuntimeNode operatorParams = NewTuple(operatorParamsList); - TCallableBuilder callableBuilder = BuildCommonCombinerParams( - "DqHashCombine"sv, - operatorParams, - flow, - keyExtractor, - init, - update, - finish); + TCallableBuilder callableBuilder = + BuildCommonCombinerParams("DqHashCombine"sv, operatorParams, flow, keyExtractor, init, update, finish); return TRuntimeNode(callableBuilder.Build(), false); } -TRuntimeNode TDqProgramBuilder::DqHashAggregate(TRuntimeNode flow, const bool spilling, const TWideLambda& keyExtractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish) +TRuntimeNode TDqProgramBuilder::DqHashAggregate(TRuntimeNode flow, const bool spilling, const TWideLambda& keyExtractor, + const TBinaryWideLambda& init, const TTernaryWideLambda& update, + const TBinaryWideLambda& finish) { TRuntimeNode::TList operatorParamsList; operatorParamsList.push_back(NewDataLiteral(spilling)); TRuntimeNode operatorParams = NewTuple(operatorParamsList); - TCallableBuilder callableBuilder = BuildCommonCombinerParams( - "DqHashAggregate"sv, - operatorParams, - flow, - keyExtractor, - init, - update, - finish); + TCallableBuilder callableBuilder = + BuildCommonCombinerParams("DqHashAggregate"sv, operatorParams, flow, keyExtractor, init, update, finish); return TRuntimeNode(callableBuilder.Build(), false); } TRuntimeNode TDqProgramBuilder::DqBlockHashJoin(TRuntimeNode leftStream, TRuntimeNode rightStream, EJoinKind joinKind, - const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, TType* returnType) { + const TArrayRef& leftKeyColumns, + const TArrayRef& rightKeyColumns, TType* returnType) { - MKQL_ENSURE(joinKind == EJoinKind::Inner, "Unsupported join kind"); + MKQL_ENSURE(joinKind != EJoinKind::Cross, "Unsupported join kind"); MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch"); MKQL_ENSURE(!leftKeyColumns.empty(), "At least one key column must be specified"); @@ -137,18 +132,13 @@ TRuntimeNode TDqProgramBuilder::DqBlockHashJoin(TRuntimeNode leftStream, TRuntim TRuntimeNode::TList leftKeyColumnsNodes; leftKeyColumnsNodes.reserve(leftKeyColumns.size()); - std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), - std::back_inserter(leftKeyColumnsNodes), [this](const ui32 idx) { - return NewDataLiteral(idx); - }); + std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), std::back_inserter(leftKeyColumnsNodes), + [this](const ui32 idx) { return NewDataLiteral(idx); }); TRuntimeNode::TList rightKeyColumnsNodes; rightKeyColumnsNodes.reserve(rightKeyColumns.size()); - std::transform(rightKeyColumns.cbegin(), rightKeyColumns.cend(), - std::back_inserter(rightKeyColumnsNodes), [this](const ui32 idx) { - return NewDataLiteral(idx); - }); - + std::transform(rightKeyColumns.cbegin(), rightKeyColumns.cend(), std::back_inserter(rightKeyColumnsNodes), + [this](const ui32 idx) { return NewDataLiteral(idx); }); TCallableBuilder callableBuilder(Env, __func__, returnType); callableBuilder.Add(leftStream); @@ -161,7 +151,8 @@ TRuntimeNode TDqProgramBuilder::DqBlockHashJoin(TRuntimeNode leftStream, TRuntim } TRuntimeNode TDqProgramBuilder::DqScalarHashJoin(TRuntimeNode leftFlow, TRuntimeNode rightFlow, EJoinKind joinKind, - const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, TType* returnType) { + const TArrayRef& leftKeyColumns, + const TArrayRef& rightKeyColumns, TType* returnType) { MKQL_ENSURE(joinKind != EJoinKind::Cross, "Unsupported join kind"); MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key column count mismatch"); @@ -169,17 +160,13 @@ TRuntimeNode TDqProgramBuilder::DqScalarHashJoin(TRuntimeNode leftFlow, TRuntime TRuntimeNode::TList leftKeyColumnsNodes; leftKeyColumnsNodes.reserve(leftKeyColumns.size()); - std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), - std::back_inserter(leftKeyColumnsNodes), [this](const ui32 idx) { - return NewDataLiteral(idx); - }); + std::transform(leftKeyColumns.cbegin(), leftKeyColumns.cend(), std::back_inserter(leftKeyColumnsNodes), + [this](const ui32 idx) { return NewDataLiteral(idx); }); TRuntimeNode::TList rightKeyColumnsNodes; rightKeyColumnsNodes.reserve(rightKeyColumns.size()); - std::transform(rightKeyColumns.cbegin(), rightKeyColumns.cend(), - std::back_inserter(rightKeyColumnsNodes), [this](const ui32 idx) { - return NewDataLiteral(idx); - }); + std::transform(rightKeyColumns.cbegin(), rightKeyColumns.cend(), std::back_inserter(rightKeyColumnsNodes), + [this](const ui32 idx) { return NewDataLiteral(idx); }); TCallableBuilder callableBuilder(Env, __func__, returnType); callableBuilder.Add(leftFlow); @@ -191,5 +178,10 @@ TRuntimeNode TDqProgramBuilder::DqScalarHashJoin(TRuntimeNode leftFlow, TRuntime return TRuntimeNode(callableBuilder.Build(), false); } +TType* TDqProgramBuilder::LastScalarIndexBlock() { + return NewBlockType(NewDataType(NUdf::TDataType::Id), TBlockType::EShape::Scalar); } -} + + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/dq/comp_nodes/dq_program_builder.h b/ydb/library/yql/dq/comp_nodes/dq_program_builder.h index f8e386e6c4be..c2550c2e3e73 100644 --- a/ydb/library/yql/dq/comp_nodes/dq_program_builder.h +++ b/ydb/library/yql/dq/comp_nodes/dq_program_builder.h @@ -18,6 +18,8 @@ class TDqProgramBuilder: public TProgramBuilder { TRuntimeNode DqScalarHashJoin(TRuntimeNode leftFlow, TRuntimeNode rightFlow, EJoinKind joinKind, const TArrayRef& leftKeyColumns, const TArrayRef& rightKeyColumns, TType* returnType); + TType* LastScalarIndexBlock(); + protected: TCallableBuilder BuildCommonCombinerParams( const TStringBuf operatorName, diff --git a/ydb/library/yql/dq/comp_nodes/dq_scalar_hash_join.cpp b/ydb/library/yql/dq/comp_nodes/dq_scalar_hash_join.cpp index ef216c5acd0c..7f9344bb29fd 100644 --- a/ydb/library/yql/dq/comp_nodes/dq_scalar_hash_join.cpp +++ b/ydb/library/yql/dq/comp_nodes/dq_scalar_hash_join.cpp @@ -1,250 +1,161 @@ #include "dq_scalar_hash_join.h" +#include "dq_join_common.h" #include #include #include -#include #include +#include #include -#include namespace NKikimr::NMiniKQL { namespace { -TKeyTypes KeyTypesFromColumns(const std::vector& types, const std::vector& keyIndexes) { - TKeyTypes kt; - std::ranges::copy(keyIndexes | std::views::transform([&types](ui32 typeIndex) { - const TType* type = types[typeIndex]; - MKQL_ENSURE(type->IsData(), "exepected data type"); - return std::pair{*static_cast(type)->GetDataSlot(), false}; - }), std::back_inserter(kt)); - return kt; -} - -bool SemiOrOnlyJoin(EJoinKind kind) { - switch (kind) { - using enum EJoinKind; - case RightOnly: - case RightSemi: - case LeftOnly: - case LeftSemi: - return true; - default: - return false; +class TScalarRowSource : public NNonCopyable::TMoveOnly { + public: + TScalarRowSource(IComputationWideFlowNode* flow, const std::vector& types) + : Flow_(flow) + , ConsumeBuff_(types.size()) + , Pointers_(types.size()) + { + for (int index = 0; index < std::ssize(types); ++index) { + Pointers_[index] = &ConsumeBuff_[index]; + } + MKQL_ENSURE(std::ranges::is_permutation( + ConsumeBuff_ | std::views::transform([](auto& value) { return &value; }), Pointers_), + "Pointers_ should be a permutation of ConsumeBuff_ addresses"); } -} -bool IsInner(EJoinKind kind) { - switch (kind) { - using enum EJoinKind; - case Inner: - case Full: - case Left: - case Right: - return true; - default: - return false; + bool Finished() const { + return Finished_; } -} -class TScalarHashJoinState : public TComputationValue { - using TBase = TComputationValue; - IComputationWideFlowNode* BuildSide() const { - return RightFinished_ ? nullptr : RightFlow_; + int UserDataSize() const { + return ConsumeBuff_.size(); } - IComputationWideFlowNode* ProbeSide() const { - return LeftFinished_ ? nullptr : LeftFlow_; - } - void AppendTuple(NJoinTable::TTuple left, NJoinTable::TTuple right, std::vector& output) { - MKQL_ENSURE(left || right,"appending invalid tuple"); - auto outIt = std::back_inserter(output); - if (left) { - std::copy_n(left,std::ssize(LeftColumnTypes_), outIt); - } else { - std::copy_n(NullTuples.data(),std::ssize(LeftColumnTypes_), outIt); + NYql::NUdf::EFetchStatus ForEachRow(TComputationContext& ctx, std::invocable auto consume) { + auto res = Flow_->FetchValues(ctx, Pointers_.data()); + switch (res) { + case EFetchResult::Finish: { + Finished_ = true; + return NYql::NUdf::EFetchStatus::Finish; + } + case EFetchResult::Yield: { + return NYql::NUdf::EFetchStatus::Yield; + } + case EFetchResult::One: { + consume(ConsumeBuff_.data()); + return NYql::NUdf::EFetchStatus::Ok; } - if (right) { - std::copy_n(right,std::ssize(RightColumnTypes_), outIt); - } else { - std::copy_n(NullTuples.data(),std::ssize(RightColumnTypes_), outIt); } } -public: - - TScalarHashJoinState(TMemoryUsageInfo* memInfo, - IComputationWideFlowNode* leftFlow, IComputationWideFlowNode* rightFlow, - const std::vector& leftKeyColumns, const std::vector& rightKeyColumns, - const std::vector& leftColumnTypes, const std::vector& rightColumnTypes, [[maybe_unused]] TComputationContext& ctx, - NUdf::TLoggerPtr logger, NUdf::TLogComponentId logComponent, EJoinKind joinKind) - : TBase(memInfo) - , LeftFlow_(leftFlow) - , RightFlow_(rightFlow) - , LeftKeyColumns_(leftKeyColumns) - , RightKeyColumns_(rightKeyColumns) - , LeftColumnTypes_(leftColumnTypes) - , RightColumnTypes_(rightColumnTypes) - , Logger_(logger) - , LogComponent_(logComponent) - , KeyTypes_(KeyTypesFromColumns(leftColumnTypes, leftKeyColumns)) - , JoinKind_(joinKind) - , Table_( - std::ssize(rightColumnTypes) - , TWideUnboxedEqual{KeyTypes_} - , TWideUnboxedHasher{KeyTypes_} - , NJoinTable::NeedToTrackUnusedRightTuples(joinKind)) - , Values_(rightColumnTypes.size()) - , Pointers_() - , Output_() - { - MKQL_ENSURE(RightColumnTypes_.size() == LeftColumnTypes_.size(), "unimplemented"); - MKQL_ENSURE(joinKind != EJoinKind::Cross, "Unsupported join kind"); - Pointers_.resize(LeftColumnTypes_.size()); - for (int index = 0; index < std::ssize(LeftKeyColumns_); ++index) { - Pointers_[LeftKeyColumns_[index]] = &Values_[index]; - } - int valuesIndex = 0; - for(int index = 0; index < std::ssize(Pointers_); ++index) { - if (!Pointers_[index]) { - Pointers_[index] = &Values_[ std::ssize(LeftKeyColumns_) + valuesIndex]; - valuesIndex++; + private: + bool Finished_ = false; + IComputationWideFlowNode* Flow_; + std::vector ConsumeBuff_; + std::vector Pointers_; +}; + +template class TScalarHashJoinState : public TComputationValue> { + public: + TScalarHashJoinState(TMemoryUsageInfo* memInfo, IComputationWideFlowNode* leftFlow, + IComputationWideFlowNode* rightFlow, const std::vector& leftKeyColumns, + const std::vector& rightKeyColumns, const std::vector& leftColumnTypes, + const std::vector& rightColumnTypes, NUdf::TLoggerPtr logger, TString componentName) + : NKikimr::NMiniKQL::TComputationValue(memInfo) + , Join_(memInfo, TScalarRowSource{leftFlow, leftColumnTypes}, TScalarRowSource{rightFlow, rightColumnTypes}, + TJoinMetadata{TColumnsMetadata{rightKeyColumns, rightColumnTypes}, + TColumnsMetadata{leftKeyColumns, leftColumnTypes}, + KeyTypesFromColumns(leftColumnTypes, leftKeyColumns)}, logger, componentName) + {} + + int TupleSize() const { + if constexpr (Kind == EJoinKind::LeftOnly || Kind == EJoinKind::LeftSemi) { + return Join_.ProbeSize(); + } else { + if constexpr (Kind == EJoinKind::RightOnly || Kind == EJoinKind::RightSemi) { + return Join_.BuildSize(); + } else { + return Join_.BuildSize() + Join_.ProbeSize(); } } - MKQL_ENSURE(std::ranges::is_permutation(Values_ | std::views::transform([](auto& value) {return &value;}), Pointers_), "Pointers_ should be a permutation of Values_ addresses"); + } - UDF_LOG(Logger_, LogComponent_, NUdf::ELogLevel::Debug, "TScalarHashJoinState created"); + int SizeTuples() const { + MKQL_ENSURE(OutputBuffer_.size() % TupleSize() == 0, "buffer contains tuple parts??"); + return OutputBuffer_.size() / TupleSize(); } EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) { - const int outputTupleSize = [&] { - if (SemiOrOnlyJoin(JoinKind_)) { - return std::ssize(RightColumnTypes_); - } else { - return std::ssize(RightColumnTypes_) * 2; - } - }(); - if (auto* buildSide = BuildSide()) { - auto res = buildSide->FetchValues(ctx, Pointers_.data()); - switch (res) { - - case EFetchResult::Finish: { - Table_.Build(); - RightFinished_ = true; - return EFetchResult::Yield; - } - case EFetchResult::Yield: { - return EFetchResult::Yield; - } - case EFetchResult::One: { - Table_.Add(Values_); - return EFetchResult::Yield; - } - default: - MKQL_ENSURE(false, "unreachable"); - } - } - if (!Output_.empty()) { - MKQL_ENSURE(std::ssize(Output_) >= outputTupleSize, - "Output_ must contain at least one tuple"); - for (int index = 0; index < outputTupleSize; ++index) { - int myIndex = std::ssize(Output_) - outputTupleSize + index; - int theirIndex = index; - *output[theirIndex] = Output_[myIndex]; - } - Output_.resize(std::ssize(Output_) - outputTupleSize); - return EFetchResult::One; - } - if (ProbeSide()) { - auto result = LeftFlow_->FetchValues(ctx, Pointers_.data()); - switch (result) { - case EFetchResult::Finish: { - LeftFinished_ = true; - if (Table_.UnusedTrackingOn()) { - for (auto& v : Table_.MapView()) { - if (v.second.Used && JoinKind_ == EJoinKind::RightSemi ) { - for( NJoinTable::TTuple used: v.second.Tuples ) { - std::copy_n(used, std::ssize(RightColumnTypes_), std::back_inserter(Output_)); - } + while (SizeTuples() == 0) { + auto consumeOneOrTwo = [&] { + if constexpr (SemiOrOnlyJoin(Kind)) { + return [&](NJoinTable::TTuple tuple) { + auto out = std::back_inserter(OutputBuffer_); + if (!tuple) { + tuple = NullTuples_.data(); } - } - Table_.ForEachUnused([this](NJoinTable::TTuple unused) { - if (JoinKind_ == EJoinKind::RightOnly) { - std::copy_n(unused, std::ssize(RightColumnTypes_), std::back_inserter(Output_)); + std::copy_n(tuple, Join_.ProbeSize(), out); + }; + } else { + return [&](NJoinTable::TTuple probe, NJoinTable::TTuple build) { + auto out = std::back_inserter(OutputBuffer_); + if (!probe) { + probe = NullTuples_.data(); } - if (JoinKind_ == EJoinKind::Exclusion || JoinKind_ == EJoinKind::Right || JoinKind_ == EJoinKind::Full) { - AppendTuple(nullptr, unused, Output_); + std::copy_n(probe, Join_.ProbeSize(), out); + + if (!build) { + build = NullTuples_.data(); } - }); - } - - return EFetchResult::Yield; - } - case EFetchResult::Yield: { - return EFetchResult::Yield; - } - case EFetchResult::One: { - bool found = false; - Table_.Lookup(Values_.data(), [this, &found](NJoinTable::TTuple matched) { - if (IsInner(JoinKind_)) { - AppendTuple(Values_.data(),matched,Output_); - } - found = true; - }); - if (!found && JoinKind_ == EJoinKind::LeftOnly || found && JoinKind_ == EJoinKind::LeftSemi) { - std::copy(Values_.data(), Values_.data() + std::ssize(LeftColumnTypes_), std::back_inserter(Output_)); + std::copy_n(build, Join_.BuildSize(), out); + }; } - if (!found && (JoinKind_ == EJoinKind::Exclusion || JoinKind_ == EJoinKind::Left || JoinKind_ == EJoinKind::Full)) { - AppendTuple(Values_.data(), nullptr, Output_); - } - return EFetchResult::Yield; - } - default: - MKQL_ENSURE(false, "unreachable"); + }(); + auto res = Join_.MatchRows(ctx, consumeOneOrTwo); + switch (res) { + + case EFetchResult::Finish: + return res; + case EFetchResult::Yield: + return res; + case EFetchResult::One: + break; } } - return EFetchResult::Finish; + const int outputTupleSize = TupleSize(); + MKQL_ENSURE(std::ssize(OutputBuffer_) >= outputTupleSize, "Output_ must contain at least one tuple"); + for (int index = 0; index < outputTupleSize; ++index) { + int myIndex = std::ssize(OutputBuffer_) - outputTupleSize + index; + int theirIndex = index; + *output[theirIndex] = OutputBuffer_[myIndex]; + } + OutputBuffer_.resize(std::ssize(OutputBuffer_) - outputTupleSize); + return EFetchResult::One; } -private: - IComputationWideFlowNode* const LeftFlow_; - IComputationWideFlowNode* const RightFlow_; - - const std::vector LeftKeyColumns_; - const std::vector RightKeyColumns_; - const std::vector LeftColumnTypes_; - const std::vector RightColumnTypes_; - - const NUdf::TLoggerPtr Logger_; - const NUdf::TLogComponentId LogComponent_; - const TKeyTypes KeyTypes_; - const EJoinKind JoinKind_; - const std::vector NullTuples{std::max(std::size(LeftColumnTypes_), std::size(RightColumnTypes_)), NYql::NUdf::TUnboxedValuePod{}}; - - bool LeftFinished_ = false; - bool RightFinished_ = false; - NJoinTable::TStdJoinTable Table_; - std::vector Values_; - std::vector Pointers_; - std::vector Output_; + private: + TJoin Join_; + std::vector OutputBuffer_; + const std::vector NullTuples_{ + static_cast(std::max(Join_.BuildSize(), Join_.ProbeSize())), NYql::NUdf::TUnboxedValuePod{}}; }; -class TScalarHashJoinWrapper : public TStatefulWideFlowComputationNode { -private: +template +class TScalarHashJoinWrapper : public TStatefulWideFlowComputationNode> { + private: using TBaseComputation = TStatefulWideFlowComputationNode; -public: - TScalarHashJoinWrapper( - TComputationMutables& mutables, - IComputationWideFlowNode* leftFlow, - IComputationWideFlowNode* rightFlow, - TVector&& resultItemTypes, - TVector&& leftColumnTypes, - TVector&& leftKeyColumns, - TVector&& rightColumnTypes, - TVector&& rightKeyColumns, - EJoinKind joinKind - ) + public: + TScalarHashJoinWrapper(TComputationMutables& mutables, IComputationWideFlowNode* leftFlow, + IComputationWideFlowNode* rightFlow, + TVector&& resultItemTypes, + TVector&& leftColumnTypes, + TVector&& leftKeyColumns, + TVector&& rightColumnTypes, + TVector&& rightKeyColumns) : TBaseComputation(mutables, nullptr, EValueRepresentation::Boxed) , LeftFlow_(leftFlow) , RightFlow_(rightFlow) @@ -253,47 +164,42 @@ class TScalarHashJoinWrapper : public TStatefulWideFlowComputationNode(state.AsBoxed().Get())->FetchValues(ctx, output); + return static_cast*>(state.AsBoxed().Get())->FetchValues(ctx, output); } -private: + private: void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger(); - NYql::NUdf::TLogComponentId logComponent = logger->RegisterComponent("ScalarHashJoin"); - UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized"); - - state = ctx.HolderFactory.Create( - LeftFlow_, RightFlow_, LeftKeyColumns_, RightKeyColumns_, - LeftColumnTypes_, RightColumnTypes_, - ctx, logger, logComponent, JoinKind_); + NYql::NUdf::TLoggerPtr logger = ctx.MakeLogger(); + + state = ctx.HolderFactory.Create>(LeftFlow_, RightFlow_, LeftKeyColumns_, + RightKeyColumns_, LeftColumnTypes_, + RightColumnTypes_, logger, "ScalarHashJoin"); } void RegisterDependencies() const final { - FlowDependsOnBoth(LeftFlow_, RightFlow_); + this->FlowDependsOnBoth(LeftFlow_, RightFlow_); } -private: + private: IComputationWideFlowNode* const LeftFlow_; IComputationWideFlowNode* const RightFlow_; - const TVector ResultItemTypes_; - const TVector LeftColumnTypes_; - const TVector LeftKeyColumns_; - const TVector RightColumnTypes_; - const TVector RightKeyColumns_; - const EJoinKind JoinKind_; + const TVector ResultItemTypes_; + const TVector LeftColumnTypes_; + const TVector LeftKeyColumns_; + const TVector RightColumnTypes_; + const TVector RightKeyColumns_; }; } // namespace - IComputationWideFlowNode* WrapDqScalarHashJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); @@ -306,8 +212,7 @@ IComputationWideFlowNode* WrapDqScalarHashJoin(TCallable& callable, const TCompu const auto leftType = callable.GetInput(0).GetStaticType(); MKQL_ENSURE(leftType->IsFlow(), "Expected WideFlow as a left flow"); const auto leftFlowType = AS_TYPE(TFlowType, leftType); - MKQL_ENSURE(leftFlowType->GetItemType()->IsMulti(), - "Expected Multi as a left flow item type"); + MKQL_ENSURE(leftFlowType->GetItemType()->IsMulti(), "Expected Multi as a left flow item type"); const auto leftFlowComponents = GetWideComponents(leftFlowType); MKQL_ENSURE(leftFlowComponents.size() > 0, "Expected at least one column"); TVector leftFlowItems(leftFlowComponents.cbegin(), leftFlowComponents.cend()); @@ -315,8 +220,7 @@ IComputationWideFlowNode* WrapDqScalarHashJoin(TCallable& callable, const TCompu const auto rightType = callable.GetInput(1).GetStaticType(); MKQL_ENSURE(rightType->IsFlow(), "Expected WideFlow as a right flow"); const auto rightFlowType = AS_TYPE(TFlowType, rightType); - MKQL_ENSURE(rightFlowType->GetItemType()->IsMulti(), - "Expected Multi as a right flow item type"); + MKQL_ENSURE(rightFlowType->GetItemType()->IsMulti(), "Expected Multi as a right flow item type"); const auto rightFlowComponents = GetWideComponents(rightFlowType); MKQL_ENSURE(rightFlowComponents.size() > 0, "Expected at least one column"); TVector rightFlowItems(rightFlowComponents.cbegin(), rightFlowComponents.cend()); @@ -350,18 +254,10 @@ IComputationWideFlowNode* WrapDqScalarHashJoin(TCallable& callable, const TCompu MKQL_ENSURE(leftFlow, "Expected WideFlow as a left input"); MKQL_ENSURE(rightFlow, "Expected WideFlow as a right input"); - - return new TScalarHashJoinWrapper( - ctx.Mutables, - leftFlow, - rightFlow, - std::move(joinItems), - std::move(leftFlowItems), - std::move(leftKeyColumns), - std::move(rightFlowItems), - std::move(rightKeyColumns), - joinKind - ); + return std::visit([&](auto kind) -> IComputationWideFlowNode* { + return new TScalarHashJoinWrapper( + ctx.Mutables, leftFlow, rightFlow, std::move(joinItems), std::move(leftFlowItems), + std::move(leftKeyColumns), std::move(rightFlowItems), std::move(rightKeyColumns)); + }, TypifyJoinKind(joinKind)); } } // namespace NKikimr::NMiniKQL - diff --git a/ydb/library/yql/dq/comp_nodes/type_utils.cpp b/ydb/library/yql/dq/comp_nodes/type_utils.cpp new file mode 100644 index 000000000000..c23c713e9279 --- /dev/null +++ b/ydb/library/yql/dq/comp_nodes/type_utils.cpp @@ -0,0 +1,52 @@ +#include "type_utils.h" + +namespace NKikimr::NMiniKQL { +bool UnwrapBlockTypes(const TArrayRef& typeComponents, std::vector& result) { + bool hasBlock = false; + bool hasNonBlock = false; + + result.reserve(typeComponents.size()); + for (TType* type : typeComponents) { + if (type->GetKind() == TType::EKind::Block) { + hasBlock = true; + type = static_cast(type)->GetItemType(); + } else { + hasNonBlock = true; + } + result.push_back(type); + } + MKQL_ENSURE(hasBlock != hasNonBlock, "Inconsistent wide item types: mixing of blocks and non-blocks detected"); + return hasBlock; +} + +void WrapArrayBlockTypes(std::vector& types, const TProgramBuilder& pb) { + std::transform(types.begin(), types.end(), types.begin(), + [&](TType* type) { return pb.NewBlockType(type, TBlockType::EShape::Many); }); +} + +int ArrowScalarAsInt(const TArrowBlock& scalar) { + return scalar.GetDatum().scalar_as().value; +} + +bool ForceRightOptional(EJoinKind kind) { + switch (kind) { + case EJoinKind::Left: + case EJoinKind::Exclusion: + case EJoinKind::Full: + return true; + default: + return false; + } +} + +bool ForceLeftOptional(EJoinKind kind) { + switch (kind) { + case EJoinKind::Right: + case EJoinKind::Exclusion: + case EJoinKind::Full: + return true; + default: + return false; + } +} +} // namespace NKikimr::NMiniKQL \ No newline at end of file diff --git a/ydb/library/yql/dq/comp_nodes/type_utils.h b/ydb/library/yql/dq/comp_nodes/type_utils.h index 5709637a1ce8..9371787c6e10 100644 --- a/ydb/library/yql/dq/comp_nodes/type_utils.h +++ b/ydb/library/yql/dq/comp_nodes/type_utils.h @@ -51,30 +51,16 @@ struct TWideUnboxedHasher { const TKeyTypes& Types; }; -inline bool UnwrapBlockTypes(const TArrayRef& typeComponents, std::vector& result) -{ - bool hasBlock = false; - bool hasNonBlock = false; - - result.reserve(typeComponents.size()); - for (TType* type : typeComponents) { - if (type->GetKind() == TType::EKind::Block) { - hasBlock = true; - type = static_cast(type)->GetItemType(); - } else { - hasNonBlock = true; - } - result.push_back(type); - } - MKQL_ENSURE(hasBlock != hasNonBlock, "Inconsistent wide item types: mixing of blocks and non-blocks detected"); - return hasBlock; -}; +bool UnwrapBlockTypes(const TArrayRef& typeComponents, std::vector& result); + +void WrapArrayBlockTypes(std::vector& types, const TProgramBuilder& pb); + +int ArrowScalarAsInt(const TArrowBlock& scalar); + +bool ForceLeftOptional(EJoinKind kind); -inline void WrapArrayBlockTypes(std::vector& types, const TProgramBuilder& pb) -{ - std::transform(types.begin(), types.end(), types.begin(), - [&](TType* type) { return pb.NewBlockType(type, TBlockType::EShape::Many); }); -} +// Left join causes all right columns to be nullable +bool ForceRightOptional(EJoinKind kind); } // namespace NMiniKQL } // namespace NKikimr \ No newline at end of file diff --git a/ydb/library/yql/dq/comp_nodes/ut/dq_block_hash_join_ut.cpp b/ydb/library/yql/dq/comp_nodes/ut/dq_block_hash_join_ut.cpp deleted file mode 100644 index 481d6bd5e389..000000000000 --- a/ydb/library/yql/dq/comp_nodes/ut/dq_block_hash_join_ut.cpp +++ /dev/null @@ -1,106 +0,0 @@ -#include "utils/dq_setup.h" -#include "utils/dq_factories.h" -#include "utils/utils.h" - -#include -#include -#include -#include -#include - -namespace NKikimr { -namespace NMiniKQL { - -namespace { - -NUdf::TUnboxedValue DoTestDqBlockHashJoin( - TDqSetup& setup, - TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector& leftKeyColumns, - TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector& rightKeyColumns, - EJoinKind joinKind -) { - TDqProgramBuilder& pb = setup.GetDqProgramBuilder(); - - TRuntimeNode leftList = pb.Arg(leftType); - TRuntimeNode rightList = pb.Arg(rightType); - const auto leftStream = ToWideStream(pb, leftList); - const auto rightStream = ToWideStream(pb, rightList); - const auto joinNode = pb.DqBlockHashJoin(leftStream, rightStream, joinKind, leftKeyColumns, rightKeyColumns, leftStream.GetStaticType()); - - const auto resultNode = FromWideStream(pb, joinNode); - - const auto graph = setup.BuildGraph(resultNode, {leftList.GetNode(), rightList.GetNode()}); - auto& ctx = graph->GetContext(); - - graph->GetEntryPoint(0, true)->SetValue(ctx, std::move(leftListValue)); - graph->GetEntryPoint(1, true)->SetValue(ctx, std::move(rightListValue)); - return graph->GetValue(); -} - -void RunTestDqBlockHashJoin( - TDqSetup& setup, EJoinKind joinKind, - TType* expectedType, const NUdf::TUnboxedValue& expected, - TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector& leftKeyColumns, - TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector& rightKeyColumns -) { - const auto got = DoTestDqBlockHashJoin( - setup, - leftType, std::move(leftListValue), leftKeyColumns, - rightType, std::move(rightListValue), rightKeyColumns, - joinKind - ); - - UNIT_ASSERT(got.HasValue()); - CompareListsIgnoringOrder(expectedType, expected, got); -} - -} // namespace - -Y_UNIT_TEST_SUITE(TDqBlockHashJoinBasicTest) { - - Y_UNIT_TEST(TestBasicPassthrough) { - TDqSetup setup(GetDqNodeFactory()); - - TVector leftKeys = {1, 2, 3, 4, 5}; - TVector leftValues = {"a", "b", "c", "d", "e"}; - - TVector rightKeys = {2, 3, 4, 6, 7}; - TVector rightValues = {"x", "y", "z", "u", "v"}; - - TVector expectedKeys = {1, 2, 3, 4, 5, 2, 3, 4, 6, 7}; - TVector expectedValues = {"a", "b", "c", "d", "e", "x", "y", "z", "u", "v"}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); - - RunTestDqBlockHashJoin( - setup, EJoinKind::Inner, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestEmptyStreams) { - TDqSetup setup(GetDqNodeFactory()); - - TVector emptyKeys; - TVector emptyValues; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); - - RunTestDqBlockHashJoin( - setup, EJoinKind::Inner, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - -} // Y_UNIT_TEST_SUITE - -} // namespace NMiniKQL -} // namespace NKikimr diff --git a/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp b/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp new file mode 100644 index 000000000000..faf38b3393ff --- /dev/null +++ b/ydb/library/yql/dq/comp_nodes/ut/dq_hash_join_ut.cpp @@ -0,0 +1,463 @@ +#include "utils/dq_factories.h" +#include "utils/dq_setup.h" +#include "utils/utils.h" +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NMiniKQL { + +namespace { +struct TJoinTestData { + std::unique_ptr> Setup = std::make_unique>(); + EJoinKind Kind; + TypeAndValue Left; + TVector LeftKeyColmns = {0}; + TypeAndValue Right; + TVector RightKeyColmns = {0}; + TypeAndValue Result; +}; + +TJoinTestData BasicInnerJoinTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {1, 2, 3, 4, 5}; + TVector leftValues = {"a1", "b1", "c1", "d1", "e1"}; + + TVector rightKeys = {2, 3, 4, 5, 6}; + TVector rightValues = {"b2", "c2", "d2", "e2", "f2"}; + + TVector expectedKeysLeft = {2, 3, 4, 5}; + TVector expectedValuesLeft = {"b1", "c1", "d1", "e1"}; + TVector expectedKeysRight = {2, 3, 4, 5}; + TVector expectedValuesRight = {"b2", "c2", "d2", "e2"}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Inner; + return td; +} + +TJoinTestData EmptyInnerJoinTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector emptyKeys = {}; + TVector emptyValues = {}; + + td.Left = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); + td.Right = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); + td.Result = ConvertVectorsToTuples(setup, emptyKeys, emptyValues, emptyKeys, emptyValues); + td.Kind = EJoinKind::Inner; + return td; +} + +TJoinTestData CrossJoinTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {1, 1, 1, 1, 1}; + TVector leftValues = {"a1", "b1", "c1", "d1", "e1"}; + + TVector rightKeys = {1, 1, 1, 1, 1}; + TVector rightValues = {"a2", "b2", "c2", "d2", "e2"}; + + TVector expectedKeysLeft(25, ui64{1}); + + TVector expectedValuesLeft = {"a1", "b1", "c1", "d1", "e1", "a1", "b1", "c1", "d1", "e1", "a1", "b1", "c1", + "d1", "e1", "a1", "b1", "c1", "d1", "e1", "a1", "b1", "c1", "d1", "e1"}; + TVector expectedKeysRight(25, ui64{1}); + + TVector expectedValuesRight = {"a2", "a2", "a2", "a2", "a2", "b2", "b2", "b2", "b2", + "b2", "c2", "c2", "c2", "c2", "c2", "d2", "d2", "d2", + "d2", "d2", "e2", "e2", "e2", "e2", "e2"}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Inner; + return td; +} + +TJoinTestData MixedKeysInnerTestData() { + + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {1, 1, 2, 3, 4}; + TVector leftValues = {"a1", "b1", "c1", "d1", "e1"}; + + TVector rightKeys = {1, 2, 2, 4, 5}; + TVector rightValues = {"a2", "b2", "c2", "d2", "e2"}; + + TVector expectedKeysLeft = {1, 1, 2, 2, 4}; + TVector expectedValuesLeft = { + "a1", + "b1", + "c1", + "c1", + "e1", + }; + TVector expectedKeysRight = {1, 1, 2, 2, 4}; + TVector expectedValuesRight = { + "a2", + "a2", + "b2", + "c2", + "d2", + }; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Inner; + return td; +} + +TJoinTestData EmptyLeftInnerTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys; + TVector leftValues; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"x", "y", "z"}; + + TVector expectedKeysLeft; + TVector expectedValuesLeft; + TVector expectedKeysRight; + TVector expectedValuesRight; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Inner; + return td; +} + +TJoinTestData EmptyRightInnerTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {1, 2, 3}; + TVector leftValues = {"a", "b", "c"}; + + TVector rightKeys; + TVector rightValues; + + TVector expectedKeysLeft; + TVector expectedValuesLeft; + TVector expectedKeysRight; + TVector expectedValuesRight; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Inner; + return td; +} + +TJoinTestData LeftJoinTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {1, 2, 3}; + TVector leftValues = {"a", "b", "c"}; + + TVector rightKeys; + TVector rightValues; + + TVector expectedKeysLeft = leftKeys; + TVector expectedValuesLeft = leftValues; + TVector> expectedKeysRight(3, std::nullopt); + TVector> expectedValuesRight(3, std::nullopt); + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + + td.Kind = EJoinKind::Left; + return td; +} + +TJoinTestData FullBehavesAsLeftIfRightEmptyTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {1, 2, 3}; + TVector leftValues = {"a", "b", "c"}; + + TVector rightKeys; + TVector rightValues; + + TVector expectedKeysLeft = leftKeys; + TVector expectedValuesLeft = leftValues; + TVector> expectedKeysRight(3, std::nullopt); + TVector> expectedValuesRight(3, std::nullopt); + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Full; + return td; +} + +TJoinTestData RightJoinTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys; + TVector leftValues; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"a", "b", "c"}; + + TVector> expectedKeysLeft(3, std::nullopt); + TVector> expectedValuesLeft(3, std::nullopt); + TVector expectedKeysRight = rightKeys; + TVector expectedValuesRight = rightValues; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Right; + return td; +} + +TJoinTestData FullBehavesAsRightIfLeftEmptyTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys; + TVector leftValues; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"a", "b", "c"}; + + TVector> expectedKeysLeft(3, std::nullopt); + TVector> expectedValuesLeft(3, std::nullopt); + TVector expectedKeysRight = rightKeys; + TVector expectedValuesRight = rightValues; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Full; + return td; +} + +TJoinTestData FullTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {2, 3, 4}; + TVector leftValues = {"b1", "c1", "d1"}; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"a2", "b2", "c2"}; + + TVector> expectedKeysLeft = {std::nullopt, 2, 3, 4}; + TVector> expectedValuesLeft = {std::nullopt, "b1", "c1", "d1"}; + TVector> expectedKeysRight = {1, 2, 3, std::nullopt}; + TVector> expectedValuesRight = {"a2", "b2", "c2", std::nullopt}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Full; + return td; +} + +TJoinTestData ExclusionTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {2, 3, 4}; + TVector leftValues = {"b1", "c1", "d1"}; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"a2", "b2", "c2"}; + + TVector> expectedKeysLeft = {std::nullopt, 4}; + TVector> expectedValuesLeft = {std::nullopt, "d1"}; + TVector> expectedKeysRight = {1, std::nullopt}; + TVector> expectedValuesRight = {"a2", std::nullopt}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = + ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); + td.Kind = EJoinKind::Exclusion; + return td; +} + +TJoinTestData LeftSemiTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {2, 3, 4}; + TVector leftValues = {"b1", "c1", "d1"}; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"a2", "b2", "c2"}; + + TVector> expectedKeys = {2, 3}; + TVector> expectedValues = {"b1", "c1"}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); + td.Kind = EJoinKind::LeftSemi; + return td; +} + +TJoinTestData RightSemiTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {2, 3, 4}; + TVector leftValues = {"b1", "c1", "d1"}; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"a2", "b2", "c2"}; + + TVector> expectedKeys = {2, 3}; + TVector> expectedValues = {"b2", "c2"}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); + td.Kind = EJoinKind::RightSemi; + return td; +} + +TJoinTestData LeftOnlyTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {2, 3, 4}; + TVector leftValues = {"b1", "c1", "d1"}; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"a2", "b2", "c2"}; + + TVector> expectedKeys = {4}; + TVector> expectedValues = {"d1"}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); + td.Kind = EJoinKind::LeftOnly; + return td; +} + +TJoinTestData RightOnlyTestData() { + TJoinTestData td; + auto& setup = *td.Setup; + TVector leftKeys = {2, 3, 4}; + TVector leftValues = {"b1", "c1", "d1"}; + + TVector rightKeys = {1, 2, 3}; + TVector rightValues = {"a2", "b2", "c2"}; + + TVector> expectedKeys = {1}; + TVector> expectedValues = {"a2"}; + + td.Left = ConvertVectorsToTuples(setup, leftKeys, leftValues); + td.Right = ConvertVectorsToTuples(setup, rightKeys, rightValues); + td.Result = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); + td.Kind = EJoinKind::RightOnly; + return td; +} + +void Test(TJoinTestData testData, bool blockJoin) { + TJoinDescription descr; + descr.Setup = testData.Setup.get(); + descr.LeftSource.KeyColumnIndexes = testData.LeftKeyColmns; + descr.LeftSource.ColumnTypes = + AS_TYPE(TTupleType, AS_TYPE(TListType, testData.Left.Type)->GetItemType())->GetElements(); + descr.LeftSource.ValuesList = testData.Left.Value; + descr.RightSource.KeyColumnIndexes = testData.RightKeyColmns; + descr.RightSource.ColumnTypes = + AS_TYPE(TTupleType, AS_TYPE(TListType, testData.Right.Type)->GetItemType())->GetElements(); + descr.RightSource.ValuesList = testData.Right.Value; + + THolder got = ConstructJoinGraphStream( + testData.Kind, blockJoin ? ETestedJoinAlgo::kBlockHash : ETestedJoinAlgo::kScalarHash, descr); + // FromWideStream + if (blockJoin) { + CompareListAndBlockStreamIgnoringOrder(testData.Result, *got); + } else { + CompareListAndStreamIgnoringOrder(testData.Result, *got); + } +} + +} // namespace + +Y_UNIT_TEST_SUITE(TDqHashJoinBasicTest) { + Y_UNIT_TEST_TWIN(TestBasicPassthrough, BlockJoin) { + Test(BasicInnerJoinTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestCrossPassthrough, BlockJoin) { + Test(CrossJoinTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestMixedKeysPassthrough, BlockJoin) { + Test(MixedKeysInnerTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestEmptyFlows, BlockJoin) { + Test(EmptyInnerJoinTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestEmptyLeft, BlockJoin) { + Test(EmptyLeftInnerTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestEmptyRight, BlockJoin) { + Test(EmptyRightInnerTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestLeftKind, BlockJoin) { + Test(LeftJoinTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(FullBehaves, BlockJoin) { + Test(FullBehavesAsLeftIfRightEmptyTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestRightKind, BlockJoin) { + Test(RightJoinTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestFullKindBehavesAsRightIfLeftIsEmpty, BlockJoin) { + Test(FullBehavesAsRightIfLeftEmptyTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestFullKind, BlockJoin) { + Test(FullTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestExclusionKind, BlockJoin) { + Test(ExclusionTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestLeftSemiKind, BlockJoin) { + Test(LeftSemiTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestRightSemiKind, BlockJoin) { + Test(RightSemiTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestLeftOnlyKind, BlockJoin) { + Test(LeftOnlyTestData(), BlockJoin); + } + + Y_UNIT_TEST_TWIN(TestRightOnlyKind, BlockJoin) { + Test(RightOnlyTestData(), BlockJoin); + } +} +} // namespace NKikimr::NMiniKQL \ No newline at end of file diff --git a/ydb/library/yql/dq/comp_nodes/ut/dq_scalar_hash_join_ut.cpp b/ydb/library/yql/dq/comp_nodes/ut/dq_scalar_hash_join_ut.cpp deleted file mode 100644 index 2239d9cddade..000000000000 --- a/ydb/library/yql/dq/comp_nodes/ut/dq_scalar_hash_join_ut.cpp +++ /dev/null @@ -1,499 +0,0 @@ -#include "utils/dq_setup.h" -#include "utils/dq_factories.h" -#include "utils/utils.h" - -#include -#include -#include -#include -#include -#include - -namespace NKikimr { -namespace NMiniKQL { - -namespace { - -THolder DoTestDqScalarHashJoin( - TDqSetup& setup, - TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector& leftKeyColumns, - TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector& rightKeyColumns, - EJoinKind joinKind -) { - TDqProgramBuilder& pb = setup.GetDqProgramBuilder(); - - TRuntimeNode leftList = pb.Arg(leftType); - TRuntimeNode rightList = pb.Arg(rightType); - const auto leftFlow = ToWideFlow(pb, leftList); - const auto rightFlow = ToWideFlow(pb, rightList); - - TVector resultTypes; - - auto leftComponents = GetWideComponents(leftFlow.GetStaticType()); - for (auto* type : leftComponents) { - resultTypes.push_back(type); - } - - auto rightComponents = GetWideComponents(rightFlow.GetStaticType()); - for (auto* type : rightComponents) { - resultTypes.push_back(type); - } - - auto resultMultiType = pb.NewMultiType(resultTypes); - auto resultFlowType = pb.NewFlowType(resultMultiType); - - const auto joinNode = pb.DqScalarHashJoin(leftFlow, rightFlow, joinKind, leftKeyColumns, rightKeyColumns, resultFlowType); - - const auto resultNode = FromWideStreamToTupleStream(pb, pb.FromFlow(joinNode)); - - auto graph = setup.BuildGraph(resultNode, {leftList.GetNode(), rightList.GetNode()}); - auto& ctx = graph->GetContext(); - - graph->GetEntryPoint(0, true)->SetValue(ctx, std::move(leftListValue)); - graph->GetEntryPoint(1, true)->SetValue(ctx, std::move(rightListValue)); - return graph; -} - -void RunTestDqScalarHashJoin( - TDqSetup& setup, EJoinKind joinKind, - TType* expectedType, const NUdf::TUnboxedValue& expected, - TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector& leftKeyColumns, - TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector& rightKeyColumns -) { - auto got = DoTestDqScalarHashJoin( - setup, - leftType, std::move(leftListValue), leftKeyColumns, - rightType, std::move(rightListValue), rightKeyColumns, - joinKind - ); - - CompareListAndStreamIgnoringOrder(expectedType, expected, *got); -} - -} // namespace - -Y_UNIT_TEST_SUITE(TDqScalarHashJoinBasicTest) { - - Y_UNIT_TEST(TestBasicPassthrough) { - TDqSetup setup(GetDqNodeFactory()); - - TVector leftKeys = {1, 2, 3, 4, 5}; - TVector leftValues = {"a", "b1", "c1", "d1", "e1"}; - - TVector rightKeys = {2, 3, 4, 5, 6}; - TVector rightValues = {"b2", "c2", "d2", "e2", "f"}; - - TVector expectedKeysLeft = {2, 3, 4, 5}; - TVector expectedValuesLeft = {"b1", "c1", "d1", "e1"}; - TVector expectedKeysRight = {2, 3, 4, 5}; - TVector expectedValuesRight = {"b2", "c2", "d2", "e2"}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Inner, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestCrossPassthrough) { - TDqSetup setup(GetDqNodeFactory()); - - TVector leftKeys = {1, 1, 1, 1, 1}; - TVector leftValues = {"a1", "b1", "c1", "d1", "e1"}; - - TVector rightKeys = {1, 1, 1, 1, 1}; - TVector rightValues = {"a2", "b2", "c2", "d2", "e2"}; - - TVector expectedKeysLeft(25, ui64{1}); - - TVector expectedValuesLeft = { - "a1", "b1", "c1", "d1", "e1", - "a1", "b1", "c1", "d1", "e1", - "a1", "b1", "c1", "d1", "e1", - "a1", "b1", "c1", "d1", "e1", - "a1", "b1", "c1", "d1", "e1" - }; - TVector expectedKeysRight(25, ui64{1}); - - TVector expectedValuesRight = { - "a2", "a2", "a2", "a2", "a2", - "b2", "b2", "b2", "b2", "b2", - "c2","c2","c2","c2","c2", - "d2","d2","d2","d2","d2", - "e2","e2","e2","e2","e2"}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); - RunTestDqScalarHashJoin( - setup, EJoinKind::Inner, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - - } - - Y_UNIT_TEST(TestMixedKeysPassthrough) { - TDqSetup setup(GetDqNodeFactory()); - - TVector leftKeys = {1, 1, 2, 3, 4}; - TVector leftValues = {"a1", "b1", "c1", "d1", "e1"}; - - TVector rightKeys = {1, 2, 2, 4, 5}; - TVector rightValues = {"a2", "b2", "c2", "d2", "e2"}; - - TVector expectedKeysLeft = { - 1, 1, 2, 2, 4 - }; - - TVector expectedValuesLeft = { - "a1", "b1", "c1", "c1", "e1", - }; - TVector expectedKeysRight = { - 1, 1, 2, 2, 4 - }; - - TVector expectedValuesRight = { - "a2", "a2", "b2", "c2", "d2", - }; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft, expectedKeysRight, expectedValuesRight); - RunTestDqScalarHashJoin( - setup, EJoinKind::Inner, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - - } - - Y_UNIT_TEST(TestEmptyFlows) { - TDqSetup setup(GetDqNodeFactory()); - - TVector emptyKeys; - TVector emptyValues; - - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, emptyKeys, emptyValues, emptyKeys, emptyValues); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Inner, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestEmptyLeft) { - TDqSetup setup(GetDqNodeFactory()); - - TVector emptyKeys; - TVector emptyValues; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"x", "y", "z"}; - - TVector expectedKeysLeft; - TVector expectedValuesLeft; - TVector expectedKeysRight; - TVector expectedValuesRight; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft,expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Inner, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestEmptyRight) { - TDqSetup setup(GetDqNodeFactory()); - - TVector leftKeys = {1, 2, 3}; - TVector leftValues = {"a", "b", "c"}; - - TVector emptyKeys; - TVector emptyValues; - - TVector expectedKeysLeft; - TVector expectedValuesLeft; - TVector expectedKeysRight; - TVector expectedValuesRight; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, emptyKeys, emptyValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft,expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Inner, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestLeftKind) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys = {1, 2, 3}; - TVector leftValues = {"a", "b", "c"}; - - TVector rightKeys; - TVector rightValues; - - TVector expectedKeysLeft = leftKeys; - TVector expectedValuesLeft = leftValues; - TVector> expectedKeysRight(3, std::nullopt); - TVector> expectedValuesRight(3, std::nullopt); - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft,expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Left, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestFullKindBehavesAsLeftIfRightIsEmpty) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys = {1, 2, 3}; - TVector leftValues = {"a", "b", "c"}; - - TVector rightKeys; - TVector rightValues; - - TVector expectedKeysLeft = leftKeys; - TVector expectedValuesLeft = leftValues; - TVector> expectedKeysRight(3, std::nullopt); - TVector> expectedValuesRight(3, std::nullopt); - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft,expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Full, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestRightKind) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys; - TVector leftValues; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"a", "b", "c"}; - - TVector> expectedKeysLeft(3, std::nullopt); - TVector> expectedValuesLeft(3, std::nullopt); - TVector expectedKeysRight = rightKeys; - TVector expectedValuesRight = rightValues; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft,expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Right, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestFullKindBehavesAsRightIfLeftIsEmpty) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys; - TVector leftValues; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"a", "b", "c"}; - - TVector> expectedKeysLeft(3, std::nullopt); - TVector> expectedValuesLeft(3, std::nullopt); - TVector expectedKeysRight = rightKeys; - TVector expectedValuesRight = rightValues; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft,expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Full, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - - Y_UNIT_TEST(TestFullKind) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys = {2, 3, 4}; - TVector leftValues = {"b1","c1","d1"}; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"a2", "b2", "c2"}; - - TVector> expectedKeysLeft = {std::nullopt,2,3,4}; - TVector> expectedValuesLeft = {std::nullopt, "b1", "c1", "d1"}; - TVector> expectedKeysRight = {1,2,3,std::nullopt}; - TVector> expectedValuesRight = {"a2", "b2", "c2", std::nullopt}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft,expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Full, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestExclusionKind) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys = {2, 3, 4}; - TVector leftValues = {"b1","c1","d1"}; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"a2", "b2", "c2"}; - - TVector> expectedKeysLeft = {std::nullopt, 4}; - TVector> expectedValuesLeft = {std::nullopt, "d1"}; - TVector> expectedKeysRight = {1,std::nullopt}; - TVector> expectedValuesRight = {"a2", std::nullopt}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeysLeft, expectedValuesLeft,expectedKeysRight, expectedValuesRight); - - RunTestDqScalarHashJoin( - setup, EJoinKind::Exclusion, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestLeftSemiKind) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys = {2, 3, 4}; - TVector leftValues = {"b1","c1","d1"}; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"a2", "b2", "c2"}; - - TVector> expectedKeys = {2, 3}; - TVector> expectedValues = {"b1","c1"}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); - - RunTestDqScalarHashJoin( - setup, EJoinKind::LeftSemi, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - - Y_UNIT_TEST(TestRightSemiKind) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys = {2, 3, 4}; - TVector leftValues = {"b1","c1","d1"}; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"a2", "b2", "c2"}; - - TVector> expectedKeys = {2, 3}; - TVector> expectedValues = {"b2","c2"}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); - - RunTestDqScalarHashJoin( - setup, EJoinKind::RightSemi, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - Y_UNIT_TEST(TestLeftOnlyKind) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys = {2, 3, 4}; - TVector leftValues = {"b1","c1","d1"}; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"a2", "b2", "c2"}; - - TVector> expectedKeys = {4}; - TVector> expectedValues = {"d1"}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); - - RunTestDqScalarHashJoin( - setup, EJoinKind::LeftOnly, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - - Y_UNIT_TEST(TestRightOnlyKind) { - TDqSetup setup(GetDqNodeFactory()); - TVector leftKeys = {2, 3, 4}; - TVector leftValues = {"b1","c1","d1"}; - - TVector rightKeys = {1, 2, 3}; - TVector rightValues = {"a2", "b2", "c2"}; - - TVector> expectedKeys = {1}; - TVector> expectedValues = {"a2"}; - - auto [leftType, leftList] = ConvertVectorsToTuples(setup, leftKeys, leftValues); - auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeys, rightValues); - auto [expectedType, expected] = ConvertVectorsToTuples(setup, expectedKeys, expectedValues); - - RunTestDqScalarHashJoin( - setup, EJoinKind::RightOnly, - expectedType, expected, - leftType, std::move(leftList), {0}, - rightType, std::move(rightList), {0} - ); - } - - - -} - -} // namespace NMiniKQL -} // namespace NKikimr diff --git a/ydb/library/yql/dq/comp_nodes/ut/utils/utils.cpp b/ydb/library/yql/dq/comp_nodes/ut/utils/utils.cpp index 34188eadfff6..2362410fb754 100644 --- a/ydb/library/yql/dq/comp_nodes/ut/utils/utils.cpp +++ b/ydb/library/yql/dq/comp_nodes/ut/utils/utils.cpp @@ -1,10 +1,13 @@ #include "utils.h" +#include +#include +#include +#include #include #include #include #include -#include namespace NKikimr::NMiniKQL { namespace { @@ -12,35 +15,31 @@ bool IsOptionalOrNull(const TType* type) { return type->IsOptional() || type->IsNull() || type->IsPg(); } -} -TRuntimeNode ToBlockList(TProgramBuilder& pgmBuilder, TRuntimeNode list){ - return pgmBuilder.Map(list, - [&](TRuntimeNode tupleNode) -> TRuntimeNode { - TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType()); - std::vector> items; - items.emplace_back(NYql::BlockLengthColumnName, pgmBuilder.Nth(tupleNode, tupleType->GetElementsCount() - 1)); - for (size_t i = 0; i < tupleType->GetElementsCount() - 1; i++) { - const auto& memberName = pgmBuilder.GetTypeEnvironment().InternName(ToString(i)); - items.emplace_back(memberName.Str(), pgmBuilder.Nth(tupleNode, i)); - } - return pgmBuilder.NewStruct(items); +} // namespace + +TRuntimeNode ToBlockList(TProgramBuilder& pgmBuilder, TRuntimeNode list) { + return pgmBuilder.Map(list, [&](TRuntimeNode tupleNode) -> TRuntimeNode { + TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType()); + std::vector> items; + items.emplace_back(NYql::BlockLengthColumnName, pgmBuilder.Nth(tupleNode, tupleType->GetElementsCount() - 1)); + for (size_t i = 0; i < tupleType->GetElementsCount() - 1; i++) { + const auto& memberName = pgmBuilder.GetTypeEnvironment().InternName(ToString(i)); + items.emplace_back(memberName.Str(), pgmBuilder.Nth(tupleNode, i)); } - ); + return pgmBuilder.NewStruct(items); + }); } -NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, - const TArrayRef types, const NUdf::TUnboxedValuePod& values -) { - const auto maxLength = CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL, - [](size_t max, const TType* type) { +NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, const TArrayRef types, + const NUdf::TUnboxedValuePod& values) { + const auto maxLength = + CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL, [](size_t max, const TType* type) { return std::max(max, CalcMaxBlockItemSize(type)); })); TVector> builders; - std::transform(types.cbegin(), types.cend(), std::back_inserter(builders), - [&](const auto& type) { - return MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool, - maxLength, &ctx.Builder->GetPgBuilder()); - }); + std::transform(types.cbegin(), types.cend(), std::back_inserter(builders), [&](const auto& type) { + return MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool, maxLength, &ctx.Builder->GetPgBuilder()); + }); const auto& holderFactory = ctx.HolderFactory; const size_t width = types.size(); @@ -85,28 +84,26 @@ TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool sc const auto blockLenType = pgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar); TVector blockItemTypes; - std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes), - [&](const auto& itemType) { - return pgmBuilder.NewBlockType(itemType, scalar ? TBlockType::EShape::Scalar : TBlockType::EShape::Many); - }); + std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes), [&](const auto& itemType) { + return pgmBuilder.NewBlockType(itemType, scalar ? TBlockType::EShape::Scalar : TBlockType::EShape::Many); + }); // XXX: Mind the last block length column. blockItemTypes.push_back(blockLenType); return pgmBuilder.NewTupleType(blockItemTypes); } -TType* MakeJoinType(TProgramBuilder& pgmBuilder, EJoinKind joinKind, - TType* leftStreamType, const TVector& leftKeyDrops, - TType* rightListType, const TVector& rightKeyDrops -) { +TType* MakeJoinType(TDqProgramBuilder& pgmBuilder, EJoinKind joinKind, TType* leftStreamType, + const TVector& leftKeyDrops, TType* rightListType, const TVector& rightKeyDrops) { const auto leftStreamItems = ValidateBlockStreamType(leftStreamType); const auto rightListItemType = AS_TYPE(TListType, rightListType)->GetItemType(); - const auto rightPlainStructType = AS_TYPE(TStructType, pgmBuilder.ValidateBlockStructType(AS_TYPE(TStructType, rightListItemType))); + const auto rightPlainStructType = + AS_TYPE(TStructType, pgmBuilder.ValidateBlockStructType(AS_TYPE(TStructType, rightListItemType))); TVector joinReturnItems; const THashSet leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend()); - for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size + for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size if (leftKeyDropsSet.contains(i)) { continue; } @@ -123,77 +120,57 @@ TType* MakeJoinType(TProgramBuilder& pgmBuilder, EJoinKind joinKind, auto memberType = rightPlainStructType->GetMemberType(i); joinReturnItems.push_back(pgmBuilder.NewBlockType( - joinKind == EJoinKind::Inner ? memberType - : IsOptionalOrNull(memberType) ? memberType - : pgmBuilder.NewOptionalType(memberType), - TBlockType::EShape::Many - )); + joinKind == EJoinKind::Inner ? memberType + : IsOptionalOrNull(memberType) ? memberType + : pgmBuilder.NewOptionalType(memberType), TBlockType::EShape::Many)); } } - joinReturnItems.push_back(pgmBuilder.NewBlockType(pgmBuilder.NewDataType(NUdf::TDataType::Id), TBlockType::EShape::Scalar)); + joinReturnItems.push_back(pgmBuilder.LastScalarIndexBlock()); return pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(joinReturnItems)); } - // List> -> Stream> TRuntimeNode ToWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode list) { - auto wideFlow = pgmBuilder.ExpandMap(pgmBuilder.ToFlow(list), - [&](TRuntimeNode tupleNode) -> TRuntimeNode::TList { - TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType()); - TRuntimeNode::TList wide; - wide.reserve(tupleType->GetElementsCount()); - for (size_t i = 0; i < tupleType->GetElementsCount(); i++) { - wide.emplace_back(pgmBuilder.Nth(tupleNode, i)); - } - return wide; + auto wideFlow = pgmBuilder.ExpandMap(pgmBuilder.ToFlow(list), [&](TRuntimeNode tupleNode) -> TRuntimeNode::TList { + TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType()); + TRuntimeNode::TList wide; + wide.reserve(tupleType->GetElementsCount()); + for (size_t i = 0; i < tupleType->GetElementsCount(); i++) { + wide.emplace_back(pgmBuilder.Nth(tupleNode, i)); } - ); + return wide; + }); return pgmBuilder.FromFlow(wideFlow); } // Stream> -> List> TRuntimeNode FromWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) { - return pgmBuilder.Collect(pgmBuilder.NarrowMap(pgmBuilder.ToFlow(stream), - [&](TRuntimeNode::TList items) -> TRuntimeNode { - TVector tupleElements; - tupleElements.reserve(items.size()); - for (size_t i = 0; i < items.size(); i++) { - tupleElements.emplace_back(items[i]); - } - return pgmBuilder.NewTuple(tupleElements); - }) - ); + return pgmBuilder.Collect( + pgmBuilder.NarrowMap(pgmBuilder.ToFlow(stream), [&](TRuntimeNode::TList items) -> TRuntimeNode { + return pgmBuilder.NewTuple(items); + })); } // List> -> WideFlow TRuntimeNode ToWideFlow(TProgramBuilder& pgmBuilder, TRuntimeNode list) { - auto wideFlow = pgmBuilder.ExpandMap(pgmBuilder.ToFlow(list), - [&](TRuntimeNode tupleNode) -> TRuntimeNode::TList { - TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType()); - TRuntimeNode::TList wide; - wide.reserve(tupleType->GetElementsCount()); - for (size_t i = 0; i < tupleType->GetElementsCount(); i++) { - wide.emplace_back(pgmBuilder.Nth(tupleNode, i)); - } - return wide; + auto wideFlow = pgmBuilder.ExpandMap(pgmBuilder.ToFlow(list), [&](TRuntimeNode tupleNode) -> TRuntimeNode::TList { + TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType()); + TRuntimeNode::TList wide; + wide.reserve(tupleType->GetElementsCount()); + for (size_t i = 0; i < tupleType->GetElementsCount(); i++) { + wide.emplace_back(pgmBuilder.Nth(tupleNode, i)); } - ); + return wide; + }); return wideFlow; } // WideFlow -> List> TRuntimeNode FromWideFlow(TProgramBuilder& pgmBuilder, TRuntimeNode wideFlow) { - return pgmBuilder.Collect(pgmBuilder.NarrowMap(wideFlow, - [&](TRuntimeNode::TList items) -> TRuntimeNode { - TVector tupleElements; - tupleElements.reserve(items.size()); - for (size_t i = 0; i < items.size(); i++) { - tupleElements.emplace_back(items[i]); - } - return pgmBuilder.NewTuple(tupleElements); - }) - ); + return pgmBuilder.Collect(pgmBuilder.NarrowMap(wideFlow, [&](TRuntimeNode::TList items) -> TRuntimeNode { + return pgmBuilder.NewTuple(items); + })); } TVector ConvertListToVector(const NUdf::TUnboxedValue& list) { @@ -213,18 +190,27 @@ TRuntimeNode FromWideStreamToTupleStream(TProgramBuilder& pgmBuilder, TRuntimeNo [&](TRuntimeNode::TList items) -> TRuntimeNode { return pgmBuilder.NewTuple(items); })); } +NYql::NUdf::TUnboxedValue MakeTupleFromUVRange(const THolderFactory& factory, auto UVRange) { + NYql::NUdf::TUnboxedValue tuple; + NUdf::TUnboxedValue* items = nullptr; + tuple = factory.CreateDirectArrayHolder(std::ranges::distance(UVRange), items); + std::ranges::copy(UVRange, items); + return tuple; +} -TVector ConvertStreamToVector(IComputationGraph& stream) { - NUdf::TUnboxedValue v; +TVector ConvertWideStreamToTupleVector(IComputationGraph& stream, size_t tupleSize) { + std::vector buff{tupleSize}; TVector vec; + NYql::NUdf::TUnboxedValue tuple; auto it = stream.GetValue(); while (true) { NYql::NUdf::EFetchStatus status; - status = it.Fetch(v); + status = it.WideFetch(buff.data(), tupleSize); switch (status) { case NYql::NUdf::EFetchStatus::Ok: { - vec.push_back(v); + const THolderFactory& factory = stream.GetContext().HolderFactory; + vec.push_back(MakeTupleFromUVRange(factory, buff)); break; } case NYql::NUdf::EFetchStatus::Finish: { @@ -239,7 +225,6 @@ TVector ConvertStreamToVector(IComputationGraph& stream) { } } - namespace { void CompareVectorsIgnoringOrder(const TType* type, TVector expectedItems, TVector gotItems) { @@ -260,6 +245,42 @@ void CompareVectorsIgnoringOrder(const TType* type, TVector FlattenBlocks(const TComputationContext& ctx, TVector blocks, + const TTupleType* outputType) { + TVector flattened; + TVector tupleBuff; + NYql::NUdf::TUnboxedValue tuple; + TTypeInfoHelper typeInfoHelper; + size_t resultTupleSize = outputType->GetElementsCount(); + std::vector UVBlocks{resultTupleSize}; + std::vector Blocks{resultTupleSize}; + std::vector> InputReaders{resultTupleSize}; + std::vector> InputItemConverters{resultTupleSize}; + for (size_t index = 0; index < resultTupleSize; ++index) { + auto* thisType = outputType->GetElementType(index); + InputReaders[index] = NYql::NUdf::MakeBlockReader(typeInfoHelper, thisType); + InputItemConverters[index] = MakeBlockItemConverter(typeInfoHelper, thisType, ctx.Builder->GetPgBuilder()); + } + + for (NYql::NUdf::TUnboxedValue block : blocks) { + auto uv = block.GetElement(resultTupleSize); + int rows = ArrowScalarAsInt(TArrowBlock::From(uv)); + for (size_t colIdx = 0; colIdx < resultTupleSize; ++colIdx) { + UVBlocks[colIdx] = block.GetElement(colIdx); + Blocks[colIdx] = &TArrowBlock::From(UVBlocks[colIdx]).GetDatum(); + } + for (int rowIdx = 0; rowIdx < rows; ++rowIdx) { + flattened.push_back(MakeTupleFromUVRange( + ctx.HolderFactory, std::views::iota(0u, resultTupleSize) | std::views::transform([&](size_t colIndex) { + return InputItemConverters[colIndex]->MakeValue( + InputReaders[colIndex]->GetItem(*Blocks[colIndex]->array(), rowIdx), + ctx.HolderFactory); + }))); + } + } + return flattened; +} } // namespace void CompareListsIgnoringOrder(const TType* type, const NUdf::TUnboxedValue& expected, @@ -267,9 +288,18 @@ void CompareListsIgnoringOrder(const TType* type, const NUdf::TUnboxedValue& exp CompareVectorsIgnoringOrder(type, ConvertListToVector(expected), ConvertListToVector(gotList)); } -void CompareListAndStreamIgnoringOrder(const TType* type, const NUdf::TUnboxedValue& expected, - IComputationGraph& gotStream) { - CompareVectorsIgnoringOrder(type, ConvertListToVector(expected), ConvertStreamToVector(gotStream)); +void CompareListAndStreamIgnoringOrder(const TypeAndValue& expected, IComputationGraph& gotStream) { + auto listValueType = AS_TYPE(TTupleType, AS_TYPE(TListType, expected.Type)->GetItemType()); + CompareVectorsIgnoringOrder(expected.Type, ConvertListToVector(expected.Value), + ConvertWideStreamToTupleVector(gotStream, listValueType->GetElementsCount())); +} + +void CompareListAndBlockStreamIgnoringOrder(const TypeAndValue& expected, IComputationGraph& gotBlockStream) { + auto listValueType = AS_TYPE(TTupleType, AS_TYPE(TListType, expected.Type)->GetItemType()); + auto blockList = ConvertWideStreamToTupleVector(gotBlockStream, listValueType->GetElementsCount() + 1); + auto flattenedList = FlattenBlocks(gotBlockStream.GetContext(), blockList, listValueType); + + CompareVectorsIgnoringOrder(expected.Type, ConvertListToVector(expected.Value), flattenedList); } } // namespace NKikimr::NMiniKQL diff --git a/ydb/library/yql/dq/comp_nodes/ut/utils/utils.h b/ydb/library/yql/dq/comp_nodes/ut/utils/utils.h index 4a9b9d685c1d..5c478dc5d827 100644 --- a/ydb/library/yql/dq/comp_nodes/ut/utils/utils.h +++ b/ydb/library/yql/dq/comp_nodes/ut/utils/utils.h @@ -3,24 +3,24 @@ #include #include + namespace NKikimr { namespace NMiniKQL { // TODO (mfilitov): think how can we reuse the code -// Code from https://github.com/ydb-platform/ydb/blob/main/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h +// Code from +// https://github.com/ydb-platform/ydb/blob/main/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h -// List> -> List, ..., "n": Block, "_yql_block_length": Scalar>> +// List> -> List, ..., "n": Block, "_yql_block_length": +// Scalar>> TRuntimeNode ToBlockList(TProgramBuilder& pgmBuilder, TRuntimeNode list); -NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, - const TArrayRef types, const NUdf::TUnboxedValuePod& values); +NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, const TArrayRef types, + const NUdf::TUnboxedValuePod& values); TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType, bool scalar); -TType* MakeJoinType(TProgramBuilder& pgmBuilder, EJoinKind joinKind, - TType* leftStreamType, const TVector& leftKeyDrops, - TType* rightListType, const TVector& rightKeyDrops -); - +TType* MakeJoinType(TDqProgramBuilder& pgmBuilder, EJoinKind joinKind, TType* leftStreamType, + const TVector& leftKeyDrops, TType* rightListType, const TVector& rightKeyDrops); // List> -> Stream> TRuntimeNode ToWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode list); @@ -37,58 +37,60 @@ TRuntimeNode FromWideFlow(TProgramBuilder& pgmBuilder, TRuntimeNode wideFlow); // Stream> -> Stream> TRuntimeNode FromWideStreamToTupleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream); -TVector ConvertListToVector(const NUdf::TUnboxedValue& list); +TVector ConvertListToVector(const NUdf::TUnboxedValue& list); + +TVector ConvertWideStreamToTupleVector(IComputationGraph& wideStream, size_t tupleSize); + -TVector ConvertStreamToVector(IComputationGraph& tupleStream); +struct TypeAndValue { + TType* Type; + NUdf::TUnboxedValue Value; +}; void CompareListsIgnoringOrder(const TType* type, const NUdf::TUnboxedValue& expected, const NUdf::TUnboxedValue& gotList); -void CompareListAndStreamIgnoringOrder(const TType* type, const NUdf::TUnboxedValue& expected, - IComputationGraph& gotStream); -template -const TVector BuildListNodes(TProgramBuilder& pb, - const TVector& vector -) { +void CompareListAndStreamIgnoringOrder(const TypeAndValue& expected, IComputationGraph& gotStream); +void CompareListAndBlockStreamIgnoringOrder(const TypeAndValue& expected, IComputationGraph& gitBlockStream); + +template +const TVector BuildListNodes(TProgramBuilder& pb, const TVector& vector) { TType* itemType; if constexpr (std::is_same_v>) { itemType = pb.NewOptionalType(pb.NewDataType(NUdf::EDataSlot::String)); } else if constexpr (std::is_same_v) { itemType = pb.NewDataType(NUdf::EDataSlot::String); - } else if constexpr (std::is_same_v>) { + } else if constexpr (std::is_same_v>) { itemType = pb.NewOptionalType(pb.NewDataType(NUdf::EDataSlot::Uint64)); } else { itemType = pb.NewDataType(NUdf::TDataType::Id); } TRuntimeNode::TList listItems; - std::transform(vector.cbegin(), vector.cend(), std::back_inserter(listItems), - [&](const auto value) { - if constexpr (std::is_same_v>) { - if (value == std::nullopt) { - return pb.NewEmptyOptional(itemType); - } else { - return pb.NewOptional(pb.NewDataLiteral(*value)); - } - } else if constexpr (std::is_same_v) { - return pb.NewDataLiteral(value); - } else if constexpr (std::is_same_v>) { - if (value == std::nullopt) { - return pb.NewEmptyOptional(itemType); - } else { - return pb.NewOptional(pb.NewDataLiteral(*value)); - } + std::transform(vector.cbegin(), vector.cend(), std::back_inserter(listItems), [&](const auto value) { + if constexpr (std::is_same_v>) { + if (value == std::nullopt) { + return pb.NewEmptyOptional(itemType); + } else { + return pb.NewOptional(pb.NewDataLiteral(*value)); + } + } else if constexpr (std::is_same_v) { + return pb.NewDataLiteral(value); + } else if constexpr (std::is_same_v>) { + if (value == std::nullopt) { + return pb.NewEmptyOptional(itemType); } else { - return pb.NewDataLiteral(value); + return pb.NewOptional(pb.NewDataLiteral(*value)); } - }); + } else { + return pb.NewDataLiteral(value); + } + }); return {pb.NewList(itemType, listItems)}; } -template -const TVector BuildListNodes(TProgramBuilder& pb, - const TVector& vector, Tail... vectors -) { +template +const TVector BuildListNodes(TProgramBuilder& pb, const TVector& vector, Tail... vectors) { const auto frontList = BuildListNodes(pb, vector); const auto tailLists = BuildListNodes(pb, std::forward(vectors)...); TVector lists; @@ -99,28 +101,22 @@ const TVector BuildListNodes(TProgramBuilder& pb, } return lists; } -template -const std::pair ConvertVectorsToTuples( - TDqSetup& setup, TVectors... vectors -) { + +template TypeAndValue ConvertVectorsToTuples(TDqSetup& setup, TVectors... vectors) { TProgramBuilder& pb = *setup.PgmBuilder; const auto lists = BuildListNodes(pb, std::forward(vectors)...); const auto tuplesNode = pb.Zip(lists); const auto tuplesNodeType = tuplesNode.GetStaticType(); const auto tuples = setup.BuildGraph(tuplesNode)->GetValue(); - return std::make_pair(tuplesNodeType, tuples); + return {tuplesNodeType, tuples}; } - -template -std::pair, NUdf::TUnboxedValue> ConvertVectorsToRuntimeTypesAndValue( - TDqSetup& setup, TVectors... vectors -) { +template +std::pair, NUdf::TUnboxedValue> ConvertVectorsToRuntimeTypesAndValue(TDqSetup& setup, + TVectors... vectors) { auto p = ConvertVectorsToTuples(setup, vectors...); - return std::make_pair(AS_TYPE(TTupleType, AS_TYPE(TListType, p.first)->GetItemType())->GetElements(), p.second); + return std::make_pair(AS_TYPE(TTupleType, AS_TYPE(TListType, p.Type)->GetItemType())->GetElements(), p.Value); } - - -} -} +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/ydb/library/yql/dq/comp_nodes/ut/ya.make b/ydb/library/yql/dq/comp_nodes/ut/ya.make index b946708d0e5f..908a3e07ee20 100644 --- a/ydb/library/yql/dq/comp_nodes/ut/ya.make +++ b/ydb/library/yql/dq/comp_nodes/ut/ya.make @@ -7,7 +7,7 @@ PEERDIR( ydb/library/yql/dq/comp_nodes/ut/utils yql/essentials/public/udf/service/exception_policy yql/essentials/sql/pg_dummy - + ydb/core/kqp/tools/join_perf ydb/core/kqp/runtime library/cpp/testing/unittest @@ -21,8 +21,7 @@ YQL_LAST_ABI_VERSION() SRCS( dq_hash_combine_ut.cpp - dq_block_hash_join_ut.cpp - dq_scalar_hash_join_ut.cpp + dq_hash_join_ut.cpp ) END() diff --git a/ydb/library/yql/dq/comp_nodes/ya.make.inc b/ydb/library/yql/dq/comp_nodes/ya.make.inc index e3490d5b5c9e..0fe5fa18b821 100644 --- a/ydb/library/yql/dq/comp_nodes/ya.make.inc +++ b/ydb/library/yql/dq/comp_nodes/ya.make.inc @@ -23,6 +23,8 @@ SET(ORIG_SOURCES dq_block_hash_join.cpp dq_scalar_hash_join.cpp dq_hash_join_table.cpp + dq_join_common.cpp + type_utils.cpp ) YQL_LAST_ABI_VERSION()