Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#include <yql/essentials/providers/common/mkql/yql_type_mkql.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <yql/essentials/core/dq_integration/yql_dq_integration.h>

#include <ydb/library/yql/dq/comp_nodes/type_utils.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <cstdlib>

namespace NKikimr {
Expand Down Expand Up @@ -468,10 +469,25 @@ TIntrusivePtr<IMkqlCallableCompiler> CreateKqlCompiler(const TKqlCompileContext&
TStringStream errorStream;
auto returnType = NCommon::BuildType(*node.GetTypeAnn(), ctx.PgmBuilder(), errorStream);
YQL_ENSURE(returnType, "Failed to build return type: " << errorStream.Str());

auto graceJoinRenames = [&]{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why gracejoin?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest not calling the format with two arrays “gracejoin.” Let’s just call it “the format with two arrays,” since “gracejoin” causes confusion. It’s not urgent for this PR.

auto wideStreamComponentsSize = [](TRuntimeNode node)->int {
return AS_TYPE(TMultiType, AS_TYPE(TStreamType,node.GetStaticType())->GetItemType())->GetElementsCount();
};
TDqRenames renames{};
for(int index = 0; index < wideStreamComponentsSize(leftInput) - 1; ++index) {
renames.emplace_back(index, JoinSide::kLeft);
}
for(int index = 0; index < wideStreamComponentsSize(rightInput) - 1; ++index) {
renames.emplace_back(index, JoinSide::kRight);
}
return TGraceJoinRenames::FromDq(renames);
}();


// Use the specialized DqBlockHashJoin method
return ctx.PgmBuilder().DqBlockHashJoin(leftInput, rightInput, joinKind,
leftKeyColumns, rightKeyColumns, returnType);
leftKeyColumns, rightKeyColumns, graceJoinRenames.Left, graceJoinRenames.Right, returnType);
});

compiler->AddCallable(TDqPhyHashCombine::CallableName(), [&ctx](const TExprNode& node, TMkqlBuildContext& buildCtx) {
Expand Down
74 changes: 44 additions & 30 deletions ydb/core/kqp/tools/join_perf/construct_join_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,51 @@ THolder<IComputationGraph> ConstructJoinGraphStream(EJoinKind joinKind, ETestedJ
TDqProgramBuilder& dqPb = descr.Setup->GetDqProgramBuilder();
TProgramBuilder& pb = static_cast<TProgramBuilder&>(dqPb);

TVector<TType*> resultTypesArr;
TVector<const ui32> leftRenames, rightRenames;
if (joinKind != EJoinKind::RightOnly && joinKind != EJoinKind::RightSemi) {
for (int colIndex = 0; colIndex < std::ssize(descr.LeftSource.ColumnTypes); ++colIndex) {
leftRenames.push_back(colIndex);
leftRenames.push_back(colIndex);
;
TGraceJoinRenames renames;
if (descr.CustomRenames) {
renames = TGraceJoinRenames::FromDq(*descr.CustomRenames);
} else {
if (joinKind != EJoinKind::RightOnly && joinKind != EJoinKind::RightSemi) {
for (int colIndex = 0; colIndex < std::ssize(descr.LeftSource.ColumnTypes); ++colIndex) {
renames.Left.push_back(colIndex);
renames.Left.push_back(colIndex);
}
}
for (auto& resType : descr.LeftSource.ColumnTypes) {
resultTypesArr.push_back([&] {
if (ForceLeftOptional(joinKind) && !resType->IsOptional()) {
return pb.NewOptionalType(resType);
} else {
return resType;
}
}());
if (joinKind != EJoinKind::LeftOnly && joinKind != EJoinKind::LeftSemi) {
for (int colIndex = 0; colIndex < std::ssize(descr.RightSource.ColumnTypes); ++colIndex) {
renames.Right.push_back(colIndex);
renames.Right.push_back(colIndex + std::ssize(descr.LeftSource.ColumnTypes));
}
}
}
if (joinKind != EJoinKind::LeftOnly && joinKind != EJoinKind::LeftSemi) {
for (int colIndex = 0; colIndex < std::ssize(descr.RightSource.ColumnTypes); ++colIndex) {
rightRenames.push_back(colIndex);
rightRenames.push_back(colIndex + std::ssize(resultTypesArr));
}
for (auto* resType : descr.LeftSource.ColumnTypes) {
resultTypesArr.push_back([&] {
if (ForceRightOptional(joinKind) && !resType->IsOptional()) {
return pb.NewOptionalType(resType);
} else {
return resType;
}
}());

const TVector<TType*> resultTypesArr = [&] {
TVector<TType*> arr;
TDqRenames dqRenames = FromGraceFormat(renames);
for (auto rename : dqRenames) {
if (rename.Side == JoinSide::kLeft) {
auto* resType = descr.LeftSource.ColumnTypes[rename.Index];
arr.push_back([&] {
if (ForceLeftOptional(joinKind) && !resType->IsOptional()) {
return pb.NewOptionalType(resType);
} else {
return resType;
}
}());
} else {
auto* resType = descr.RightSource.ColumnTypes[rename.Index];
arr.push_back([&] {
if (ForceRightOptional(joinKind) && !resType->IsOptional()) {
return pb.NewOptionalType(resType);
} else {
return resType;
}
}());
}
}
}
return arr;
}();

struct TJoinArgs {
TRuntimeNode Left;
Expand Down Expand Up @@ -159,7 +172,7 @@ THolder<IComputationGraph> ConstructJoinGraphStream(EJoinKind joinKind, ETestedJ

return dqPb.FromFlow(dqPb.GraceJoin(ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), joinKind,
descr.LeftSource.KeyColumnIndexes, descr.RightSource.KeyColumnIndexes,
leftRenames, rightRenames, dqPb.NewFlowType(multiResultType)));
renames.Left, renames.Right, dqPb.NewFlowType(multiResultType)));
}
case NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarMap: {
Y_ABORT_IF(descr.RightSource.KeyColumnIndexes.size() > 1,
Expand Down Expand Up @@ -213,12 +226,13 @@ THolder<IComputationGraph> ConstructJoinGraphStream(EJoinKind joinKind, ETestedJ
blockResultTypes.push_back(dqPb.LastScalarIndexBlock());
return dqPb.DqBlockHashJoin(ToWideStream(dqPb, args.Left), ToWideStream(dqPb, args.Right), joinKind,
descr.LeftSource.KeyColumnIndexes, descr.RightSource.KeyColumnIndexes,
renames.Left, renames.Right,
pb.NewStreamType(pb.NewMultiType(blockResultTypes)));
}
case ETestedJoinAlgo::kScalarHash: {
return pb.FromFlow(dqPb.DqScalarHashJoin(
ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), joinKind, descr.LeftSource.KeyColumnIndexes,
descr.RightSource.KeyColumnIndexes, pb.NewFlowType(multiResultType)));
descr.RightSource.KeyColumnIndexes, renames.Left, renames.Right, pb.NewFlowType(multiResultType)));
}
default:
Y_ABORT("unreachable");
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/tools/join_perf/construct_join_graph.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "benchmark_settings.h"
#include <ydb/library/yql/dq/comp_nodes/dq_program_builder.h>
#include <ydb/library/yql/dq/comp_nodes/type_utils.h>
#include <ydb/library/yql/dq/comp_nodes/ut/utils/dq_setup.h>
#include <ydb/library/yql/dq/comp_nodes/ut/utils/utils.h>

Expand All @@ -16,6 +17,7 @@ struct TJoinDescription {
TJoinSourceData LeftSource;
TJoinSourceData RightSource;
TDqSetup<false>* Setup;
std::optional<TDqRenames> CustomRenames;
};

bool IsBlockJoin(ETestedJoinAlgo algo);
Expand Down
1 change: 0 additions & 1 deletion ydb/core/kqp/ut/join/kqp_block_hash_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ Y_UNIT_TEST_SUITE(KqpBlockHashJoin) {
UNIT_ASSERT_C(status.IsSuccess(), status.GetIssues().ToString());

auto resultSet = status.GetResultSets()[0];
// Current Join implementation is simple and returns all the rows
auto expectedRowsCount = 3;
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), expectedRowsCount);

Expand Down
98 changes: 36 additions & 62 deletions ydb/library/yql/dq/comp_nodes/dq_block_hash_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,21 @@ template <EJoinKind Kind> class TBlockHashJoinWrapper : public TMutableComputati
TBlockHashJoinWrapper(TComputationMutables& mutables, const TVector<TType*>&& resultItemTypes,
const TVector<TType*>&& leftItemTypes, const TVector<ui32>&& leftKeyColumns,
const TVector<TType*>&& rightItemTypes, const TVector<ui32>&& rightKeyColumns,
IComputationNode* leftStream, IComputationNode* rightStream)
IComputationNode* leftStream, IComputationNode* rightStream, TDqRenames renames)
: TBaseComputation(mutables, EValueRepresentation::Boxed)
, ResultItemTypes_(std::move(resultItemTypes))
, LeftItemTypes_(std::move(leftItemTypes))
, LeftKeyColumns_(std::move(leftKeyColumns))
, RightItemTypes_(std::move(rightItemTypes))
, RightKeyColumns_(std::move(rightKeyColumns))
, Renames_(std::move(renames))
, LeftStream_(leftStream)
, RightStream_(rightStream)
{}

NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
return ctx.HolderFactory.Create<TStreamValue>(ctx, LeftKeyColumns_, RightKeyColumns_, LeftStream_, RightStream_,
LeftItemTypes_, RightItemTypes_, ResultItemTypes_);
LeftItemTypes_, RightItemTypes_, ResultItemTypes_, Renames_);
}

private:
Expand All @@ -111,7 +112,7 @@ template <EJoinKind Kind> class TBlockHashJoinWrapper : public TMutableComputati
TStreamValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<ui32>& leftKeyColumns,
const TVector<ui32>& rightKeyColumns, IComputationNode* leftStream, IComputationNode* rightStream,
const TVector<TType*>& leftStreamTypes, const TVector<TType*>& rightStreamTypes,
const TVector<TType*>& resultStreamTypes)
const TVector<TType*>& resultStreamTypes, TDqRenames renames)
: TBase(memInfo)
, Join_(memInfo, TBlockRowSource{ctx, leftStream, leftStreamTypes},
TBlockRowSource{ctx, rightStream, rightStreamTypes},
Expand All @@ -120,6 +121,7 @@ template <EJoinKind Kind> class TBlockHashJoinWrapper : public TMutableComputati
KeyTypesFromColumns(leftStreamTypes, leftKeyColumns)}, ctx.MakeLogger(),
"BlockHashJoin")
, Ctx_(&ctx)
, Output_(std::move(renames), leftStreamTypes, rightStreamTypes)
, OutputTypes_(resultStreamTypes)
{
TTypeInfoHelper typeInfoHelper;
Expand All @@ -129,37 +131,31 @@ template <EJoinKind Kind> class TBlockHashJoinWrapper : public TMutableComputati
}
}

int TupleSize() const {
return OutputTypes_.size() - 1 /*mind last integer column*/;
}

int SizeTuples() const {
MKQL_ENSURE(OutputBuffer_.size() % TupleSize() == 0, "buffer contains tuple parts??");
return OutputBuffer_.size() / TupleSize();
}

void FlushTo(NUdf::TUnboxedValue* output) {
MKQL_ENSURE(!OutputBuffer_.empty(), "make sure we are flushing something, not empty set of tuples");
NUdf::EFetchStatus FlushTo(NUdf::TUnboxedValue* output) {
MKQL_ENSURE(!Output_.OutputBuffer.empty(), "make sure we are flushing something, not empty set of tuples");
TTypeInfoHelper helper;
std::vector<std::unique_ptr<NYql::NUdf::IArrayBuilder>> blockBuilders;
int rows = SizeTuples();
for (int i = 0; i < TupleSize(); ++i) {
int rows = Output_.SizeTuples();
for (int i = 0; i < Output_.TupleSize(); ++i) {
blockBuilders.push_back(MakeArrayBuilder(helper, OutputTypes_[i], Ctx_->ArrowMemoryPool, rows,
&Ctx_->Builder->GetPgBuilder()));
}

for (int rowIndex = 0; rowIndex < rows; ++rowIndex) {
for (int colIndex = 0; colIndex < TupleSize(); ++colIndex) {
int valueIndex = colIndex + rowIndex * TupleSize();
blockBuilders[colIndex]->Add(OutputItemConverters_[colIndex]->MakeItem(OutputBuffer_[valueIndex]));
for (int colIndex = 0; colIndex < Output_.TupleSize(); ++colIndex) {
int valueIndex = colIndex + rowIndex * Output_.TupleSize();
blockBuilders[colIndex]->Add(
OutputItemConverters_[colIndex]->MakeItem(Output_.OutputBuffer[valueIndex]));
}
}
OutputBuffer_.clear();
for (int colIndex = 0; colIndex < TupleSize(); ++colIndex) {
Output_.OutputBuffer.clear();
for (int colIndex = 0; colIndex < Output_.TupleSize(); ++colIndex) {
output[colIndex] = Ctx_->HolderFactory.CreateArrowBlock(blockBuilders[colIndex]->Build(true));
}
output[TupleSize()] = Ctx_->HolderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(rows)));
MKQL_ENSURE(OutputBuffer_.empty(), "something left after flush??");
output[Output_.TupleSize()] =
Ctx_->HolderFactory.CreateArrowBlock(arrow::Datum(static_cast<uint64_t>(rows)));
MKQL_ENSURE(Output_.OutputBuffer.empty(), "something left after flush??");
return NYql::NUdf::EFetchStatus::Ok;
}

private:
Expand All @@ -169,58 +165,33 @@ template <EJoinKind Kind> class TBlockHashJoinWrapper : public TMutableComputati
if (Finished_) {
return NYql::NUdf::EFetchStatus::Finish;
}
auto out = std::back_inserter(OutputBuffer_);
auto consumeOneOrTwo = [&] {
if constexpr (SemiOrOnlyJoin(Kind)) {
return [&](NJoinTable::TTuple tuple) {
MKQL_ENSURE(tuple != nullptr, "null output row in semi/only join?");
std::copy_n(tuple, Join_.ProbeSize(), out);
};
} else {
return [&](NJoinTable::TTuple probe, NJoinTable::TTuple build) {
if (!probe) { // todo: remove nullptr checks for some join types.
probe = NullTuples_.data();
}
std::copy_n(probe, Join_.ProbeSize(), out);

if (!build) {
build = NullTuples_.data();
}
std::copy_n(build, Join_.BuildSize(), out);
};
}
}();
while (SizeTuples() < Threshold) {
auto res = Join_.MatchRows(*Ctx_, consumeOneOrTwo);
while (Output_.SizeTuples() < Threshold_) {
auto res = Join_.MatchRows(*Ctx_, Output_.MakeConsumeFn());
switch (res) {
case EFetchResult::Finish: {
if (SizeTuples() == 0) {
if (Output_.SizeTuples() == 0) {
return NYql::NUdf::EFetchStatus::Finish;
}
FlushTo(output);
Finished_ = true;
return NYql::NUdf::EFetchStatus::Ok;
return FlushTo(output);
}
case EFetchResult::Yield:
return NYql::NUdf::EFetchStatus::Yield;
case EFetchResult::One:
break;
}
}
FlushTo(output);
return NUdf::EFetchStatus::Ok;
return FlushTo(output);
}

private:
TJoin<TBlockRowSource, Kind> Join_;
TComputationContext* Ctx_;
std::vector<NUdf::TUnboxedValue> OutputBuffer_;
TRenamedOutput<Kind> Output_;
std::vector<std::unique_ptr<IBlockItemConverter>> OutputItemConverters_;
const std::vector<TType*> OutputTypes_;
const int Threshold = 10000;
const int Threshold_ = 10000;
bool Finished_ = false;
const std::vector<NYql::NUdf::TUnboxedValue> NullTuples_{
static_cast<size_t>(std::max(Join_.BuildSize(), Join_.ProbeSize())), NYql::NUdf::TUnboxedValuePod{}};
};

void RegisterDependencies() const final {
Expand All @@ -234,14 +205,15 @@ template <EJoinKind Kind> class TBlockHashJoinWrapper : public TMutableComputati
const TVector<ui32> LeftKeyColumns_;
const TVector<TType*> RightItemTypes_;
const TVector<ui32> RightKeyColumns_;
const TDqRenames Renames_;
IComputationNode* LeftStream_;
IComputationNode* RightStream_;
};

} // namespace

IComputationNode* WrapDqBlockHashJoin(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
MKQL_ENSURE(callable.GetInputsCount() == 7, "Expected 7 args");

const auto joinType = callable.GetType()->GetReturnType();
MKQL_ENSURE(joinType->IsStream(), "Expected WideStream as a resulting stream");
Expand Down Expand Up @@ -282,7 +254,6 @@ IComputationNode* WrapDqBlockHashJoin(TCallable& callable, const TComputationNod
const auto joinKindNode = callable.GetInput(2);
const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
const auto joinKind = GetJoinKind(rawKind);
MKQL_ENSURE(joinKind != EJoinKind::Cross, "Only inner join is supported in block hash join prototype");

const auto leftKeyColumnsLiteral = callable.GetInput(3);
const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral);
Expand All @@ -301,16 +272,19 @@ IComputationNode* WrapDqBlockHashJoin(TCallable& callable, const TComputationNod
const auto item = AS_VALUE(TDataLiteral, rightKeyColumnsTuple->GetValue(i));
rightKeyColumns.emplace_back(item->AsValue().Get<ui32>());
}
TDqRenames renames =
FromGraceFormat(TGraceJoinRenames::FromRuntimeNodes(callable.GetInput(5), callable.GetInput(6)));

MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch");

const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0);
const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1);
return std::visit([&](auto kind) -> IComputationNode* {
return new TBlockHashJoinWrapper<decltype(kind)::Kind_>(
ctx.Mutables, std::move(joinItems), std::move(leftStreamItems), std::move(leftKeyColumns),
std::move(rightStreamItems), std::move(rightKeyColumns), leftStream, rightStream);
}, TypifyJoinKind(joinKind));
MKQL_ENSURE(joinKind == EJoinKind::Inner, "Only inner is supported, see gh#26780 for details.");
ValidateRenames(renames, joinKind, std::ssize(leftStreamItems) - 1, std::ssize(rightStreamItems) - 1);

return new TBlockHashJoinWrapper<EJoinKind::Inner>(ctx.Mutables, std::move(joinItems), std::move(leftStreamItems),
std::move(leftKeyColumns), std::move(rightStreamItems),
std::move(rightKeyColumns), leftStream, rightStream, renames);
}

} // namespace NKikimr::NMiniKQL
Loading
Loading