Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/kqp/tools/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ ColumnLimit: 120
AllowShortLambdasOnASingleLine: Inline
AllowShortFunctionsOnASingleLine: Empty
PackConstructorInitializers: Never
BreakConstructorInitializers: BeforeComma
BreakConstructorInitializers: BeforeComma
# BinPackLongBracedList: wait for clang-format-21
BinPackArguments: false
BinPackParameters: false
2 changes: 1 addition & 1 deletion ydb/core/kqp/tools/join_perf/benchmark_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ struct TPreset {
};

struct TBenchmarkSettings {

int Seed;
TVector<TPreset> Presets;
TSet<ETestedJoinKeyType> KeyTypes;
TSet<ETestedJoinAlgo> Algorithms;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from matplotlib.ticker import FormatStrFormatter, ScalarFormatter
import pandas as pd
import matplotlib.pyplot as plt
import sys
Expand All @@ -18,11 +19,11 @@
name_parts = run_name.split('_')
only_needed.append({
'run_name': run_name,
'time': obj["resultTime"],
'time': int(obj["resultTime"]),
'join_algorithm': name_parts[0],
'input_data_flavour': name_parts[2],
'left_table_size': name_parts[3],
'right_table_size': name_parts[4],
'left_table_size': int(name_parts[3]),
'right_table_size': int(name_parts[4]),
'key_type': name_parts[1]
}
)
Expand All @@ -45,6 +46,8 @@ def geo_mean_70percent_lowest(series):
log_images = images_root + "log"
Path(simple_images).mkdir(parents=True, exist_ok=True)
Path(log_images).mkdir(parents=True, exist_ok=True)
pd.set_option('display.max_rows', 500)

data_flovours = df['input_data_flavour'].unique()
key_types = df['key_type'].unique()
for data_flavour in data_flovours:
Expand All @@ -56,7 +59,8 @@ def geo_mean_70percent_lowest(series):
fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(10, 8), sharex=True)

for name, group in subset.groupby('join_algorithm'):
group = group.groupby('left_table_size')['time'].apply(lambda x: geo_mean_70percent_lowest(x)).sort_values()
group = group.groupby('left_table_size')['time'].apply(lambda x: geo_mean_70percent_lowest(x)).sort_index()
axes.set_xticks(group.index)
axes.plot(
group.index,
group.values,
Expand All @@ -65,6 +69,7 @@ def geo_mean_70percent_lowest(series):
)
axes.set_ylabel('time')
axes.set_xlabel('left_rows')
axes.get_xaxis().set_major_formatter(FormatStrFormatter('%d'))
axes.legend()

fig.suptitle(graph_name, fontsize=16)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include "benchmark_settings.h"
#include <ydb/core/kqp/tools/join_perf/benchmark_settings.h>

#include "ydb/core/kqp/tools/combiner_perf/fs_utils.h"
#include <ydb/core/kqp/tools/combiner_perf/fs_utils.h>
#include <library/cpp/getopt/small/last_getopt.h>
#include <library/cpp/getopt/small/last_getopt_opts.h>
#include <library/cpp/getopt/small/last_getopt_parse_result.h>
#include <library/cpp/getopt/small/last_getopt_parser.h>

#include "joins.h"
#include <ydb/core/kqp/tools/join_perf/joins.h>
#include <filesystem>
#include <util/string/printf.h>

Expand Down Expand Up @@ -40,7 +40,7 @@ int main(int argc, char** argv) {
int samples = 1;
int scale = 1;
opts.AddHelpOption().Help("visit NBenchmarkSizes namespace in benchmark_settings.cpp for explanation");
opts.AddLongOption('c', "case")
opts.AddLongOption('p', "preset")
.Help("left and right table sizes to choose for joins benchmark.")
.Choices({"exp", "linear", "small"})
.DefaultValue("small")
Expand All @@ -58,13 +58,14 @@ int main(int argc, char** argv) {
}
}();
});
opts.AddLongOption('s', "samples").Help("number representing how much to repeat single case. useful for noise reduction.").DefaultValue(1).StoreResult(&samples);
opts.AddLongOption("scale").Help("size of smallest table in case").DefaultValue(1<<18).StoreResult(&scale);
opts.AddLongOption("samples").Help("number representing how much to repeat single case. useful for noise reduction.").DefaultValue(1).StoreResult(&samples);
opts.AddLongOption("scale").Help("size of smallest table in case").DefaultValue(1).StoreResult(&scale);
opts.AddLongOption("seed").Help("seed for keys generation").DefaultValue(123).StoreResult(&params.Seed);
params.Algorithms = {
NKikimr::NMiniKQL::ETestedJoinAlgo::kBlockMap,
// NKikimr::NMiniKQL::ETestedJoinAlgo::kBlockHash,
NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarMap,
// NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarHash,
NKikimr::NMiniKQL::ETestedJoinAlgo::kBlockHash,
// NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarMap, // slow
NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarHash,
NKikimr::NMiniKQL::ETestedJoinAlgo::kScalarGrace,
};
params.KeyTypes = {
Expand Down
27 changes: 27 additions & 0 deletions ydb/core/kqp/tools/join_perf/bin/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
PROGRAM(join_perf)

YQL_LAST_ABI_VERSION()

IF (MKQL_RUNTIME_VERSION)
CFLAGS(
-DMKQL_RUNTIME_VERSION=$MKQL_RUNTIME_VERSION
)
ENDIF()

PEERDIR(
ydb/core/kqp/tools/combiner_perf
ydb/core/kqp/tools/join_perf
library/cpp/lfalloc/alloc_profiler
library/cpp/dwarf_backtrace
library/cpp/dwarf_backtrace/registry
library/cpp/getopt
library/cpp/getopt/small
library/cpp/json
)

SRCS(
main.cpp
)

END()

77 changes: 48 additions & 29 deletions ydb/core/kqp/tools/join_perf/construct_join_graph.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#include "construct_join_graph.h"
#include <algorithm>
#include <ydb/library/yql/dq/comp_nodes/type_utils.h>
#include <ydb/library/yql/dq/comp_nodes/ut/utils/utils.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/mkql_node_printer.h>

namespace NKikimr::NMiniKQL {

namespace {

TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind, TRuntimeNode leftList,
TRuntimeNode BuildBlockJoin(TDqProgramBuilder& pgmBuilder, EJoinKind joinKind, TRuntimeNode leftList,
TArrayRef<const ui32> leftKeyColumns, const TVector<ui32>& leftKeyDrops,
TRuntimeNode rightList, TArrayRef<const ui32> rightKeyColumns,
const TVector<ui32>& rightKeyDrops, bool rightAny) {
TRuntimeNode rightList,
TArrayRef<const ui32> rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny) {
const auto leftStream = ToWideStream(pgmBuilder, leftList);
const auto rightBlockList = ToBlockList(pgmBuilder, rightList);

Expand Down Expand Up @@ -59,26 +61,42 @@ bool IsBlockJoin(ETestedJoinAlgo kind) {
return kind == ETestedJoinAlgo::kBlockHash || kind == ETestedJoinAlgo::kBlockMap;
}

THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, TInnerJoinDescription descr) {
THolder<IComputationGraph> ConstructJoinGraphStream(EJoinKind joinKind, ETestedJoinAlgo algo, TJoinDescription descr) {

const EJoinKind kInnerJoin = EJoinKind::Inner;
const bool scalar = !IsBlockJoin(algo);
TDqProgramBuilder& dqPb = descr.Setup->GetDqProgramBuilder();
TProgramBuilder& pb = static_cast<TProgramBuilder&>(dqPb);

TVector<TType* const> resultTypesArr;
TVector<TType*> resultTypesArr;
TVector<const ui32> leftRenames, rightRenames;
for (ui32 idx = 0; idx < std::ssize(descr.LeftSource.ColumnTypes); ++idx) {
resultTypesArr.push_back(descr.LeftSource.ColumnTypes[idx]);
leftRenames.push_back(idx);
leftRenames.push_back(idx);
if (joinKind != EJoinKind::RightOnly && joinKind != EJoinKind::RightSemi) {
for (int colIndex = 0; colIndex < std::ssize(descr.LeftSource.ColumnTypes); ++colIndex) {
leftRenames.push_back(colIndex);
leftRenames.push_back(colIndex);
}
for (auto& resType : descr.LeftSource.ColumnTypes) {
resultTypesArr.push_back([&] {
if (ForceLeftOptional(joinKind) && !resType->IsOptional()) {
return pb.NewOptionalType(resType);
} else {
return resType;
}
}());
}
}
for (ui32 idx = 0; idx < std::ssize(descr.RightSource.ColumnTypes); ++idx) {
if (std::ranges::all_of(descr.RightSource.KeyColumnIndexes,
[idx](ui32 keyColumnIdx) { return keyColumnIdx != idx; })) {
rightRenames.push_back(idx);
rightRenames.push_back(resultTypesArr.size());
resultTypesArr.push_back(descr.RightSource.ColumnTypes[idx]);
if (joinKind != EJoinKind::LeftOnly && joinKind != EJoinKind::LeftSemi) {
for (int colIndex = 0; colIndex < std::ssize(descr.RightSource.ColumnTypes); ++colIndex) {
rightRenames.push_back(colIndex);
rightRenames.push_back(colIndex + std::ssize(resultTypesArr));
}
for (auto* resType : descr.LeftSource.ColumnTypes) {
resultTypesArr.push_back([&] {
if (ForceRightOptional(joinKind) && !resType->IsOptional()) {
return pb.NewOptionalType(resType);
} else {
return resType;
}
}());
}
}

Expand Down Expand Up @@ -139,7 +157,7 @@ THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T

case ETestedJoinAlgo::kScalarGrace: {

return dqPb.FromFlow(dqPb.GraceJoin(ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), kInnerJoin,
return dqPb.FromFlow(dqPb.GraceJoin(ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), joinKind,
descr.LeftSource.KeyColumnIndexes, descr.RightSource.KeyColumnIndexes,
leftRenames, rightRenames, dqPb.NewFlowType(multiResultType)));
}
Expand Down Expand Up @@ -173,7 +191,7 @@ THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T
std::ssize(descr.RightSource.ColumnTypes) -
descr.RightSource.KeyColumnIndexes.size());
TRuntimeNode mapJoinSomething =
pb.MapJoinCore(source, rightDict, kInnerJoin, descr.LeftSource.KeyColumnIndexes, scalarMapRenames.Left,
pb.MapJoinCore(source, rightDict, joinKind, descr.LeftSource.KeyColumnIndexes, scalarMapRenames.Left,
scalarMapRenames.Right, pb.NewFlowType(pb.NewTupleType(resultTypesArr)));

return ToWideStream(
Expand All @@ -183,21 +201,23 @@ THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T
}
case ETestedJoinAlgo::kBlockMap: {
TVector<ui32> kEmptyColumnDrops;
TVector<ui32> kRightDroppedColumns;
std::copy(descr.RightSource.KeyColumnIndexes.begin(), descr.RightSource.KeyColumnIndexes.end(),
std::back_inserter(kRightDroppedColumns));

return BuildBlockJoin(pb, kInnerJoin, args.Left, descr.LeftSource.KeyColumnIndexes, kEmptyColumnDrops,
args.Right, descr.RightSource.KeyColumnIndexes, kRightDroppedColumns, false);
return BuildBlockJoin(dqPb, joinKind, args.Left, descr.LeftSource.KeyColumnIndexes, kEmptyColumnDrops,
args.Right, descr.RightSource.KeyColumnIndexes, kEmptyColumnDrops, false);
}
case ETestedJoinAlgo::kBlockHash: {
return dqPb.DqBlockHashJoin(ToWideStream(dqPb, args.Left), ToWideStream(dqPb, args.Right), kInnerJoin,
TVector<TType*> blockResultTypes;
for (TType* type : resultTypesArr) {
blockResultTypes.push_back(pb.NewBlockType(type, TBlockType::EShape::Many));
}
blockResultTypes.push_back(dqPb.LastScalarIndexBlock());
return dqPb.DqBlockHashJoin(ToWideStream(dqPb, args.Left), ToWideStream(dqPb, args.Right), joinKind,
descr.LeftSource.KeyColumnIndexes, descr.RightSource.KeyColumnIndexes,
pb.NewStreamType(multiResultType));
pb.NewStreamType(pb.NewMultiType(blockResultTypes)));
}
case ETestedJoinAlgo::kScalarHash: {
return pb.FromFlow(dqPb.DqScalarHashJoin(
ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), kInnerJoin, descr.LeftSource.KeyColumnIndexes,
ToWideFlow(pb, args.Left), ToWideFlow(pb, args.Right), joinKind, descr.LeftSource.KeyColumnIndexes,
descr.RightSource.KeyColumnIndexes, pb.NewFlowType(multiResultType)));
}
default:
Expand All @@ -207,13 +227,12 @@ THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, T
return graphFrom(wideStream);
}

i32 ResultColumnCount(ETestedJoinAlgo algo, TInnerJoinDescription descr) {
i32 ResultColumnCount(ETestedJoinAlgo algo, TJoinDescription descr) {
/*
+1 in block case because
yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp:TBlockJoinState::GetOutputWidth();
*/
return IsBlockJoin(algo) + std::ssize(descr.LeftSource.ColumnTypes) + std::ssize(descr.RightSource.ColumnTypes) -
std::ssize(descr.LeftSource.KeyColumnIndexes);
return IsBlockJoin(algo) + std::ssize(descr.LeftSource.ColumnTypes) + std::ssize(descr.RightSource.ColumnTypes);
}

} // namespace NKikimr::NMiniKQL
7 changes: 4 additions & 3 deletions ydb/core/kqp/tools/join_perf/construct_join_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "benchmark_settings.h"
#include <ydb/library/yql/dq/comp_nodes/dq_program_builder.h>
#include <ydb/library/yql/dq/comp_nodes/ut/utils/dq_setup.h>
#include <ydb/library/yql/dq/comp_nodes/ut/utils/utils.h>

namespace NKikimr::NMiniKQL {

Expand All @@ -11,15 +12,15 @@ struct TJoinSourceData {
NYql::NUdf::TUnboxedValue ValuesList;
};

struct TInnerJoinDescription {
struct TJoinDescription {
TJoinSourceData LeftSource;
TJoinSourceData RightSource;
TDqSetup<false>* Setup;
};

bool IsBlockJoin(ETestedJoinAlgo algo);

THolder<IComputationGraph> ConstructInnerJoinGraphStream(ETestedJoinAlgo algo, TInnerJoinDescription descr);
THolder<IComputationGraph> ConstructJoinGraphStream(EJoinKind joinKind, ETestedJoinAlgo algo, TJoinDescription descr);

i32 ResultColumnCount(ETestedJoinAlgo algo, TInnerJoinDescription descr);
i32 ResultColumnCount(ETestedJoinAlgo algo, TJoinDescription descr);
} // namespace NKikimr::NMiniKQL
23 changes: 12 additions & 11 deletions ydb/core/kqp/tools/join_perf/joins.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ TVector<TString> GenerateStringKeyColumn(i32 size, i32 seed) {
}

template <typename KeyType>
NKikimr::NMiniKQL::TInnerJoinDescription PrepareDescription(NKikimr::NMiniKQL::TDqSetup<false>* setup,
TVector<KeyType> leftKeys, TVector<KeyType> rightKeys) {
NKikimr::NMiniKQL::TJoinDescription
PrepareDescription(NKikimr::NMiniKQL::TDqSetup<false>* setup, TVector<KeyType> leftKeys, TVector<KeyType> rightKeys) {
const int leftSize = std::ssize(leftKeys);
const int rightSize = std::ssize(rightKeys);
NKikimr::NMiniKQL::TInnerJoinDescription descr;
NKikimr::NMiniKQL::TJoinDescription descr;
descr.Setup = setup;
std::tie(descr.LeftSource.ColumnTypes, descr.LeftSource.ValuesList) = ConvertVectorsToRuntimeTypesAndValue(
*setup, std::move(leftKeys), TVector<ui64>(leftSize, 111), TVector<TString>(leftSize, "meow"));
std::tie(descr.LeftSource.ColumnTypes, descr.LeftSource.ValuesList) =
ConvertVectorsToRuntimeTypesAndValue(*setup, std::move(leftKeys), TVector<ui64>(leftSize, 111));
std::tie(descr.RightSource.ColumnTypes, descr.RightSource.ValuesList) =
ConvertVectorsToRuntimeTypesAndValue(*setup, std::move(rightKeys), TVector<TString>(rightSize, "woo"));
return descr;
Expand Down Expand Up @@ -64,15 +64,16 @@ TVector<TBenchmarkCaseResult> NKikimr::NMiniKQL::RunJoinsBench(const TBenchmarkS
for (auto keyPreset : params.Presets) {
for (auto sizes : keyPreset.Cases) {
NKikimr::NMiniKQL::TDqSetup<false> setup{NKikimr::NMiniKQL::GetPerfTestFactory()};
TInnerJoinDescription descr = [&] {
TJoinDescription descr = [&] {
using enum ETestedJoinKeyType;
switch (keyType) {

case kString: {
return PrepareDescription(&setup, GenerateStringKeyColumn(sizes.Left, 123),
return PrepareDescription(&setup, GenerateStringKeyColumn(sizes.Left, params.Seed),
GenerateStringKeyColumn(sizes.Right, 111));
}
case kInteger: {
return PrepareDescription(&setup, GenerateIntegerKeyColumn(sizes.Left, 123),
return PrepareDescription(&setup, GenerateIntegerKeyColumn(sizes.Left, params.Seed),
GenerateIntegerKeyColumn(sizes.Right, 111));
}
default:
Expand All @@ -85,9 +86,9 @@ TVector<TBenchmarkCaseResult> NKikimr::NMiniKQL::RunJoinsBench(const TBenchmarkS

TBenchmarkCaseResult result;
result.CaseName = CaseName(algo, keyType, keyPreset, sizes);

result.CaseName += Sprintf("_seed:_%i", params.Seed);
THolder<NKikimr::NMiniKQL::IComputationGraph> wideStreamGraph =
ConstructInnerJoinGraphStream(algo, descr);
ConstructJoinGraphStream(EJoinKind::Inner, algo, descr);
NYql::NUdf::TUnboxedValue wideStream = wideStreamGraph->GetValue();
std::vector<NYql::NUdf::TUnboxedValue> fetchBuff;
ui32 cols = NKikimr::NMiniKQL::ResultColumnCount(algo, descr);
Expand All @@ -106,7 +107,7 @@ TVector<TBenchmarkCaseResult> NKikimr::NMiniKQL::RunJoinsBench(const TBenchmarkS
}

result.RunDuration = TDuration::MicroSeconds(ThreadCPUTime() - timeStartMicroSeconds);
Cerr << ". Output line count: " << lineCount << Endl;
Cerr << Sprintf(". output line count: %i, time took: %ims.", lineCount, result.RunDuration.MilliSeconds()) << Endl;
ret.push_back(result);
}
}
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/kqp/tools/join_perf/ya.make
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PROGRAM(join_perf)
LIBRARY()

YQL_LAST_ABI_VERSION()

Expand All @@ -19,7 +19,6 @@ PEERDIR(
)

SRCS(
main.cpp
construct_join_graph.cpp
joins.cpp
benchmark_settings.cpp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/join/kqp_block_hash_join_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Y_UNIT_TEST_SUITE(KqpBlockHashJoin) {

auto resultSet = status.GetResultSets()[0];
// Current Join implementation is simple and returns all the rows
auto expectedRowsCount = UseBlockHashJoin ? 6 : 3;
auto expectedRowsCount = 3;
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), expectedRowsCount);

auto explainResult = queryClient.ExecuteQuery(
Expand Down
Loading
Loading