diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 7fa5c7e71912..34e618228301 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -175,7 +175,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase { } TMaybeNode OptimizeEquiJoinWithCosts(TExprBase node, TExprContext& ctx) { - TCBOSettings settings { + TCBOSettings settings{ .MaxDPhypDPTableSize = Config->MaxDPHypDPTableSize.Get().GetOrElse(TDqSettings::TDefault::MaxDPHypDPTableSize), .ShuffleEliminationJoinNumCutoff = Config->ShuffleEliminationJoinNumCutoff.Get().GetOrElse(TDqSettings::TDefault::ShuffleEliminationJoinNumCutoff) }; diff --git a/ydb/core/kqp/ut/common/kqp_arg_parser.h b/ydb/core/kqp/ut/common/kqp_arg_parser.h new file mode 100644 index 000000000000..ed1728983e52 --- /dev/null +++ b/ydb/core/kqp/ut/common/kqp_arg_parser.h @@ -0,0 +1,266 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NKqp { + +class TArgs { +public: + template + class TRangedValueIter { + public: + TRangedValueIter(TValue current, TValue end, TValue step) + : Current_(current) + , End_(end) + , Step_(step) + { + } + + TValue operator*() const { + return Current_; + } + + TRangedValueIter& operator++() { + Current_ += Step_; + if (Current_ >= End_) { + Current_ = End_; + } + + return *this; + } + + bool operator!=(TRangedValueIter other) const { + assert(Step_ == other.Step_); + return Current_ != other.Current_; + } + + private: + TValue Current_; + TValue End_; + TValue Step_; + }; + + template + class TRangedValue { + public: + TRangedValue(TValue from, TValue to, TValue step) + : IsRange_(true) + , From_(from) + , To_(to) + , Step_(step) + { + } + + TRangedValue(TValue from) + : IsRange_(false) + , From_(from) + , To_(from) + , Step_(1) + { + } + + bool IsRange() const { + return IsRange_; + } + + TRangedValueIter end() const { + TValue End = To_ + 1; // immediately after the last + return TRangedValueIter{End, End, Step_}; + } + + TRangedValueIter begin() const { + return TRangedValueIter{From_, *end(), Step_}; + } + + TValue GetValue() const { + return From_; + } + + TValue GetFirst() const { + return From_; + } + + TValue GetLast() const { + return To_; + } + + TValue GetStep() const { + return Step_; + } + + private: + bool IsRange_; + + TValue From_; + TValue To_; + TValue Step_; + }; + +public: + TArgs(std::string input) + : Values_(ParseMap(input)) + { + } + + std::string GetString(std::string key) { + if (!HasArg(key)) { + throw std::out_of_range("arg not provided: '" + key + "'"); + } + return Values_[key]; + } + + std::string GetStringOrDefault(std::string key, std::string defaultValue) { + if (HasArg(key)) { + return GetString(key); + } + + return defaultValue; + } + + template + auto GetArg(std::string key) { + return ParseRangedValue(GetString(key)); + } + + template + auto GetArgOrDefault(std::string key, std::string defaultSerialized) { + if (HasArg(key)) { + return GetArg(key); + } + return ParseRangedValue(defaultSerialized); + } + + bool HasArg(std::string key) { + return Values_.contains(key); + } + +private: + std::map Values_; + +private: + static void LTrim(std::string& input) { + input.erase(input.begin(), std::find_if(input.begin(), input.end(), [](unsigned char ch) { + return !std::isspace(ch); + })); + } + + static void RTrim(std::string& input) { + input.erase(std::find_if(input.rbegin(), input.rend(), [](unsigned char ch) { + return !std::isspace(ch); + }).base(), input.end()); + } + + static void Trim(std::string& input) { + LTrim(input); + RTrim(input); + } + + static std::map ParseMap(const std::string& input, char delimiter = ';') { + std::map result; + std::stringstream ss(input); + + std::string entry; + while (std::getline(ss, entry, delimiter)) { + // each entry looks like key value pair, e.g. "N=5" + Trim(entry); + size_t pos = entry.find('='); + + if (pos != std::string::npos) { + std::string key = entry.substr(0, pos); + std::string value = entry.substr(pos + 1); + Trim(value); + result[std::move(key)] = std::move(value); + } + } + + return result; + } + + template + static auto ParseRangedValue(const std::string& input) { + // Check if it contains ".." + size_t dotdot = input.find(".."); + + if (dotdot == std::string::npos) { + // parse fixed value + auto value = ParseValue(input); + return TRangedValue{value}; + } else { + // parse ranged (with step or without) + size_t comma = input.find(','); + + auto to = ParseValue(input.substr(dotdot + 2)); + if (comma != std::string::npos && comma < dotdot) { + // parse ranges like "0.1,0.2..1.0" + auto first = ParseValue(input.substr(0, comma)); + auto second = ParseValue(input.substr(comma + 1, dotdot - comma - 1)); + auto step = second - first; + return TRangedValue{first, to, step}; + } + + // parse ranges like "1..100" + auto first = ParseValue(input.substr(0, dotdot)); + return TRangedValue{first, to, /*default step=*/1}; + } + } + + template + static auto ParseValue(const std::string& input) { + if constexpr (std::is_same_v) { + return std::stod(input); + } else if constexpr (std::is_same_v) { + return static_cast(std::stoull(input)); + } else if constexpr (std::is_same_v) { + return static_cast(std::stoll(input)); + } else if constexpr (std::is_same_v) { + return input; + } else if constexpr (std::is_same_v) { + return static_cast(ParseDuration(input).count()); + } else { + static_assert(false, "Unhandled type"); + } + } + + static std::chrono::nanoseconds ParseDuration(const std::string& input) { + std::regex pattern(R"((\d+(?:\.\d+)?)\s*(ns|us|ms|s|m|h))"); + std::smatch match; + + if (!std::regex_match(input, match, pattern)) { + throw std::invalid_argument("Invalid duration format"); + } + + double value = std::stod(match[1]); + std::string unit = match[2]; + + if (unit == "ns") { + return std::chrono::nanoseconds(static_cast(value)); + } + if (unit == "us") { + return std::chrono::microseconds(static_cast(value)); + } + if (unit == "ms") { + return std::chrono::milliseconds(static_cast(value)); + } + if (unit == "s") { + return std::chrono::seconds(static_cast(value)); + } + if (unit == "m") { + return std::chrono::minutes(static_cast(value)); + } + if (unit == "h") { + return std::chrono::hours(static_cast(value)); + } + + throw std::invalid_argument("Unknown unit"); + } +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/common/kqp_benches.cpp b/ydb/core/kqp/ut/common/kqp_benches.cpp new file mode 100644 index 000000000000..a2770401ee40 --- /dev/null +++ b/ydb/core/kqp/ut/common/kqp_benches.cpp @@ -0,0 +1,315 @@ +#include "kqp_benches.h" + +#include +#include + +namespace NKikimr::NKqp { + +std::string TComputedStatistics::GetCSVHeader() { + return "median,MAD,Q1,Q3,IQR,mean,stdev,n,min,max"; +} + +void TComputedStatistics::ToCSV(IOutputStream& os) const { + os << Median << "," << MAD + << "," << Q1 << "," << Q3 << "," << IQR + << "," << Mean << "," << Stdev + << "," << N << "," << Min << "," << Max; +} + +double CalculatePercentile(std::vector& data, double percentile) { + if (data.empty()) { + throw std::invalid_argument("At least one element needed to get percentiles"); + } + + double position = percentile * (data.size() - 1); + + // Get range of positions that are considered to be at this percentile + ui64 lower = std::floor(position); + ui64 upper = std::ceil(position); + double weight = position - lower; + + std::nth_element(data.begin(), data.begin() + lower, data.end()); + double lowerValue = data[lower]; + + // If position is precise, just get that element: + if (lower == upper) { + return lowerValue; + } + + // If it's a range, then interpolate linearly + // e.g. median of even number of elements is average of the central two + std::nth_element(data.begin(), data.begin() + upper, data.end()); + double upperValue = data[upper]; + + return lowerValue + weight * (upperValue - lowerValue); +} + +double CalculateMedian(std::vector& data) { + return CalculatePercentile(data, 0.5); +} + +double CalculateMAD(const std::vector& data, double median, std::vector& storage) { + storage.clear(); + storage.reserve(data.size()); + + for (double value : data) { + storage.push_back(abs(value - median)); + } + + return CalculatePercentile(storage, 0.5); +} + +double CalculateMean(const std::vector& data) { + return std::accumulate(data.begin(), data.end(), 0.0) / static_cast(data.size()); +} + +double CalculateSampleStdDev(const std::vector& data, double mean) { + if (data.size() == 0) { + return 0; + } + + double deviationSquaresSum = 0; + for (double value : data) { + double deviation = abs(value - mean); + deviationSquaresSum += deviation * deviation; + }; + + return std::sqrt(deviationSquaresSum / static_cast(data.size() - 1)); +} + +void TRunningStatistics::AddValue(double num) { + Samples_.push_back(num); + Total_ += num; + + if (!Min_ || !Max_) { + Min_ = num; + Max_ = num; + } + + Min_ = Min(*Min_, num); + Max_ = Max(*Max_, num); + + if (MaxHeap_.empty() || num <= MaxHeap_.top()) { + MaxHeap_.push(num); + } else { + MinHeap_.push(num); + } + + if (MaxHeap_.size() > MinHeap_.size() + 1) { + MinHeap_.push(MaxHeap_.top()); + MaxHeap_.pop(); + } else if (MinHeap_.size() > MaxHeap_.size()) { + MaxHeap_.push(MinHeap_.top()); + MinHeap_.pop(); + } +} + +double TRunningStatistics::GetMedian() const { + Y_ASSERT(!MaxHeap_.empty() || !MinHeap_.empty()); + + if (MaxHeap_.size() > MinHeap_.size()) { + return MaxHeap_.top(); + } else { + return (MaxHeap_.top() + MinHeap_.top()) / 2.0; + } +} + +double TRunningStatistics::CalculateMAD() const { + return ::NKikimr::NKqp::CalculateMAD(Samples_, GetMedian(), MADStorage_); +} + +TComputedStatistics TStatistics::ComputeStatistics() const { + double q1 = CalculatePercentile(Samples_, 0.25); + double q3 = CalculatePercentile(Samples_, 0.75); + double iqr = q3 - q1; + + double median = CalculateMedian(Samples_); + + // ComputeStatistics should be called once per statistics combiner at most. + // Otherwise we would hold on to storage instead of reallocating it each time: + std::vector storage; + double mad = ::NKikimr::NKqp::CalculateMAD(Samples_, median, storage); + + double mean = CalculateMean(Samples_); + double stdev = CalculateSampleStdDev(Samples_, mean); + + return { + .N = Samples_.size(), + .Min = Min_, + .Max = Max_, + .Median = median, + .MAD = mad, + .Q1 = q1, + .Q3 = q3, + .IQR = iqr, + .Mean = mean, + .Stdev = stdev, + }; +} + +void TStatistics::Merge(const TStatistics& other) { + Samples_.insert(Samples_.end(), other.Samples_.begin(), other.Samples_.end()); + Min_ = std::min(Min_, other.Min_); + Max_ = std::max(Max_, other.Max_); +} + +TStatistics TStatistics::operator-(const TStatistics& statsRHS) const { + auto op = [](double lhs, double rhs) { + return lhs - rhs; + }; + + return TStatistics{ + Combine(statsRHS, op), + Min_ - statsRHS.Max_, + Max_ - statsRHS.Min_ + }; +} + +TStatistics TStatistics::operator+(const TStatistics& statsRHS) const { + auto op = [](double lhs, double rhs) { + return lhs - rhs; + }; + + return TStatistics{ + Combine(statsRHS, op), + Min_ + statsRHS.Min_, + Max_ + statsRHS.Max_ + }; +} + +TStatistics TStatistics::operator*(const TStatistics& statsRHS) const { + auto op = [](double lhs, double rhs) { + return lhs - rhs; + }; + + double candidates[] = { + Min_ * statsRHS.Min_, + Min_ * statsRHS.Max_, + Max_ * statsRHS.Min_, + Max_ * statsRHS.Max_, + }; + + return TStatistics{ + Combine(statsRHS, op), + *std::min_element(candidates, candidates + Y_ARRAY_SIZE(candidates)), + *std::max_element(candidates, candidates + Y_ARRAY_SIZE(candidates)) + }; +} + +TStatistics TStatistics::operator/(const TStatistics& statsRHS) const { + auto op = [](double lhs, double rhs) { + return lhs / rhs; + }; + + double candidates[] = { + Min_ / statsRHS.Min_, + Min_ / statsRHS.Max_, + Max_ / statsRHS.Min_, + Max_ / statsRHS.Max_, + }; + + return TStatistics{ + Combine(statsRHS, op), + *std::min_element(candidates, candidates + Y_ARRAY_SIZE(candidates)), + *std::max_element(candidates, candidates + Y_ARRAY_SIZE(candidates)) + }; +} + +static std::pair SelectUnit(ui64 nanoseconds) { + if (nanoseconds >= 1'000'000'000) { + return {"s", 1e9}; + } else if (nanoseconds >= 1'000'000) { + return {"ms", 1e6}; + } else if (nanoseconds >= 1'000) { + return {"μs", 1e3}; + } else { + return {"ns", 1.0}; + } +} + +static int GetDecimalPlaces(double scaledUncertainty) { + if (scaledUncertainty < 0.01) { + return 2; + } + + double log = std::log10(scaledUncertainty); + int magnitude = static_cast(std::floor(log)); + + if (scaledUncertainty >= 100.0) { + return 0; + } else if (scaledUncertainty >= 10.0) { + return 1; + } else if (scaledUncertainty >= 1.0) { + return 1; + } else { + return std::max(0, -magnitude + 1); + } +} + +static int GetDecimalPlacesForValue(double scaledValue) { + if (scaledValue < 0.01) { + return 4; + } + + double absValue = std::abs(scaledValue); + + if (absValue >= 1000.0) { + return 0; + } else if (absValue >= 100.0) { + return 1; + } else if (absValue >= 10.0) { + return 2; + } else if (absValue >= 1.0) { + return 2; + } else if (absValue >= 0.1) { + return 3; + } else { + return 4; + } +} + +std::string TimeFormatter::Format(ui64 valueNs, ui64 uncertaintyNs) { + auto [unit, scale] = SelectUnit(valueNs); + + double scaledValue = valueNs / scale; + double scaledUncertainty = uncertaintyNs / scale; + + int decimalPlaces = GetDecimalPlaces(scaledUncertainty); + + std::ostringstream oss; + oss << std::fixed << std::setprecision(decimalPlaces); + oss << scaledValue << " " << unit << " ± " << scaledUncertainty << " " << unit; + + return oss.str(); +} + +std::string TimeFormatter::Format(ui64 valueNs) { + auto [unit, scale] = SelectUnit(valueNs); + double scaledValue = valueNs / scale; + + int decimalPlaces = GetDecimalPlacesForValue(scaledValue); + + std::ostringstream oss; + oss << std::fixed << std::setprecision(decimalPlaces); + oss << scaledValue << " " << unit; + + return oss.str(); +} + +void DumpTimeStatistics(const TComputedStatistics& stats, IOutputStream& os) { + os << "Median = " << TimeFormatter::Format(stats.Median, stats.MAD) << " (MAD)\n"; + + os << "Q1 = " << TimeFormatter::Format(stats.Q1) + << ", Q3 = " << TimeFormatter::Format(stats.Q3) + << ", IQR = " << TimeFormatter::Format(stats.IQR) << "\n"; + + os << "N = " << stats.N << "\n"; + os << "Min, Max = [ " + << TimeFormatter::Format(stats.Min) + << ", " << TimeFormatter::Format(stats.Max) + << " ]\n"; + + os << "Mean = " << TimeFormatter::Format(stats.Mean, stats.Stdev) << " (Std. dev.)\n"; +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/common/kqp_benches.h b/ydb/core/kqp/ut/common/kqp_benches.h new file mode 100644 index 000000000000..66b2ff1307ee --- /dev/null +++ b/ydb/core/kqp/ut/common/kqp_benches.h @@ -0,0 +1,294 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace NKikimr::NKqp { + +template +ui64 MeasureTimeNanos(Lambda&& lambda) { + namespace Nsc = std::chrono; + using TClock = Nsc::high_resolution_clock; + + Nsc::time_point start = TClock::now(); + lambda(); + return Nsc::duration_cast(TClock::now() - start).count(); +} + +struct TComputedStatistics { + ui64 N; + double Min, Max; + + double Median; + double MAD; + + double Q1; + double Q3; + double IQR; + + double Mean; + double Stdev; + + static std::string GetCSVHeader(); + void ToCSV(IOutputStream& os) const; +}; + +double CalculatePercentile(const std::vector& data, double percentile); +double CalculateMedian(std::vector& data); +double CalculateMAD(const std::vector& data, double median, std::vector& storage); +double CalculateMean(const std::vector& data); +double CalculateSampleStdDev(const std::vector& data, double mean); + +class TStatistics { +public: + TStatistics(std::vector samples, double min, double max) + : Samples_(std::move(samples)) + , Min_(min) + , Max_(max) + { + } + + TComputedStatistics ComputeStatistics() const; + + ui64 GetN() const { + return Samples_.size(); + } + + double GetMin() const { + return Min_; + } + + double GetMax() const { + return Max_; + } + + void Merge(const TStatistics& other); + + // Operations on two random values, very expensive. + // Each operation produces cartesian product of all possibilities if + // it's small enough or fallbacks to Monte Carlo methods if not. + TStatistics operator-(const TStatistics& rhs) const; + TStatistics operator+(const TStatistics& rhs) const; + TStatistics operator*(const TStatistics& rhs) const; + TStatistics operator/(const TStatistics& rhs) const; + + // Only gives precisely correct min & max if you map with monotonic function + template + TStatistics Map(TMapLambda&& map) { + std::vector samples; + double min = std::numeric_limits::max(); + double max = std::numeric_limits::min(); + + for (double value : Samples_) { + double newValue = map(value); + samples.push_back(newValue); + + min = std::min(newValue, min); + max = std::max(newValue, max); + } + + double candidates[] = {min, max, map(Min_), map(Max_)}; + return { + std::move(samples), + *std::min_element(candidates, candidates + Y_ARRAY_SIZE(candidates)), + *std::max_element(candidates, candidates + Y_ARRAY_SIZE(candidates)) + }; + } + + template + TStatistics Filter(TFilterLambda&& filter) const { + std::vector samples; + + // That's an upper bound since this function can only remove elements. + // This resize can be memory inefficient, but is likely faster. + samples.reserve(Samples_.size()); + + double min = std::numeric_limits::max(); + double max = std::numeric_limits::min(); + + for (double value : Samples_) { + if (!filter(value)) { + continue; + } + + samples.push_back(value); + + min = std::min(value, min); + max = std::max(value, max); + } + + double globalMin = filter(Min_) ? Min_ : min; + double globalMax = filter(Max_) ? Max_ : max; + + return {std::move(samples), globalMin, globalMax}; + } + +private: + mutable std::vector Samples_; + double Min_; + double Max_; + + // This lets us operate on two random values, it computes distribution of values after arbitrary operation + template + std::vector + Combine(const TStatistics& rhs, TCombiner&& combine, ui64 maxSamples = 1e8) const { + std::vector combined; + + ui64 cartesianProductSize = Samples_.size() * rhs.Samples_.size(); + + if (cartesianProductSize < maxSamples) { + combined.reserve(cartesianProductSize); + + for (ui64 i = 0; i < Samples_.size(); ++ i) { + for (ui64 j = 0; j < rhs.Samples_.size(); ++ j) { + combined.push_back(combine(Samples_[i], rhs.Samples_[j])); + } + } + + return combined; + } + + // Fallback to Monte Carlo if the sample size is too big: + std::random_device randomDevice; + std::mt19937 rng(/*seed=*/randomDevice()); + + std::uniform_int_distribution<> distributionLHSIdx(0, Samples_.size() - 1); + std::uniform_int_distribution<> distributionRHSIdx(0, rhs.Samples_.size() - 1); + + for (ui64 repeatIdx = 0; repeatIdx < maxSamples; ++ repeatIdx) { + ui64 i = distributionLHSIdx(rng); + ui64 j = distributionRHSIdx(rng); + + combined.push_back(combine(Samples_[i], rhs.Samples_[j])); + } + + return combined; + } +}; + +class TRunningStatistics { +public: + TRunningStatistics() + : MinHeap_() + , MaxHeap_() + , Samples_() + , MADStorage_() + , Total_(0) + , Min_(std::nullopt) + , Max_(std::nullopt) + { + } + + void AddValue(double num); + + double GetMedian() const; + double CalculateMAD() const; + + double GetTotal() const { + return Total_; + } + + ui32 GetN() const { + return Samples_.size(); + } + + double GetMin() const { + Y_ASSERT(Min_); + return *Min_; + } + + double GetMax() const { + Y_ASSERT(Max_); + return *Max_; + } + + TStatistics GetStatistics() { + Y_ASSERT(GetN() > 0); + return {Samples_, *Min_, *Max_}; + } + +private: + std::priority_queue, std::greater> MinHeap_; + std::priority_queue> MaxHeap_; + + std::vector Samples_; + + // This vector is used to not allocate memory every time + // we are requested to compute MAD and need to calculate diviations + mutable std::vector MADStorage_; + + double Total_; + + std::optional Min_; + std::optional Max_; +}; + +class TimeFormatter { +public: + static std::string Format(uint64_t valueNs, ui64 uncertaintyNs); + static std::string Format(uint64_t valueNs); +}; + +void DumpTimeStatistics(const TComputedStatistics& stats, IOutputStream& os); + +struct TRepeatedTestConfig { + ui64 MinRepeats; + ui64 MaxRepeats; + ui64 Timeout; +}; + +template +std::optional RepeatedTest(TRepeatedTestConfig config, ui64 singleRunTimeout, double thresholdMAD, TLambda&& lambda) { + Y_ASSERT(config.MinRepeats >= 1); + + TRunningStatistics stats; + do { + bool hasTimedOut = false; + ui64 ellapsedTime = MeasureTimeNanos([&]() { hasTimedOut = !lambda(); }); + + if (hasTimedOut) { + return std::nullopt; + } + + if (ellapsedTime > singleRunTimeout) { + break; + } + + stats.AddValue(ellapsedTime); + } while ((stats.GetN() < config.MaxRepeats) && + (stats.GetN() < config.MinRepeats || + (stats.GetTotal() + stats.GetMedian() < config.Timeout && + stats.CalculateMAD() / stats.GetMedian() > thresholdMAD))); + + return std::move(stats).GetStatistics(); +} + +struct TBenchmarkConfig { + TRepeatedTestConfig Warmup; + TRepeatedTestConfig Bench; + ui64 SingleRunTimeout; + double MADThreshold; +}; + +template +std::optional Benchmark(TBenchmarkConfig config, Lambda&& lambda) { + if (config.Warmup.MaxRepeats != 0) { + auto warmupStats = RepeatedTest(config.Warmup, config.SingleRunTimeout, config.MADThreshold, lambda); + if (!warmupStats) { + return std::nullopt; + } + } + + return RepeatedTest(config.Bench, config.SingleRunTimeout, config.MADThreshold, lambda); +} + +} // end namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/common/kqp_serializable_rng.h b/ydb/core/kqp/ut/common/kqp_serializable_rng.h new file mode 100644 index 000000000000..8c21ad745859 --- /dev/null +++ b/ydb/core/kqp/ut/common/kqp_serializable_rng.h @@ -0,0 +1,108 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NKqp { + +// wrapper around std::mt19937 that tracks usage and simplifies serialization +class TSerializableMT19937 { +public: // compatibility with std::mt19937 + using result_type = std::mt19937::result_type; + static constexpr auto default_seed = std::mt19937::default_seed; + +public: + TSerializableMT19937() + : TSerializableMT19937(default_seed) + { + } + + TSerializableMT19937(uint32_t seed) + : Engine_(seed) + , Seed_(seed) + , Counter_(0) + { + } + + uint64_t Serialize() const { + return (static_cast(Seed_) << 32ULL) | static_cast(Counter_); + } + + void Restore(uint64_t state) { + Seed_ = static_cast(state >> 32); + Counter_ = static_cast(state & 0xFFFFFFFF); + + Engine_.seed(Seed_); + Engine_.discard(Counter_); + } + + void Forward(uint32_t counter) { + assert(counter >= GetCounter()); + ui32 difference = counter - GetCounter(); + discard(difference); + } + + uint32_t GetCounter() const { + return Counter_; + } + + uint32_t GetSeed() const { + return Seed_; + } + + static TSerializableMT19937 Deserialize(uint64_t key) { + TSerializableMT19937 mt; + mt.Restore(key); + + return mt; + } + +public: // compatibility with std::mt19937 + static constexpr auto min() { + return std::mt19937::min(); + } + static constexpr auto max() { + return std::mt19937::max(); + } + + auto operator()() { + assert(Counter_ != UINT32_MAX); + ++ Counter_; + return Engine_(); + } + + void seed(uint32_t seed) { + Seed_ = seed; + Counter_ = 0; + + Engine_.seed(seed); + } + + void discard(uint64_t n) { + assert(n <= static_cast(UINT32_MAX - Counter_)); + + Counter_ += static_cast(n); + Engine_.discard(n); + } + + void reset() { + Counter_ = 0; + Engine_.seed(Seed_); + } + + bool operator==(const TSerializableMT19937& other) const { + return Seed_ == other.Seed_ && Counter_ == other.Counter_; + } + + bool operator!=(const TSerializableMT19937& other) const { + return !(*this == other); + } + +private: + std::mt19937 Engine_; + uint32_t Seed_; + uint32_t Counter_; +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/common/ya.make b/ydb/core/kqp/ut/common/ya.make index 64e07aed5aa0..ec6556b78113 100644 --- a/ydb/core/kqp/ut/common/ya.make +++ b/ydb/core/kqp/ut/common/ya.make @@ -12,6 +12,7 @@ SRCS( math_udf.cpp unicode_udf.cpp digest_udf.cpp + kqp_benches.cpp ) PEERDIR( diff --git a/ydb/core/kqp/ut/join/kqp_benches_ut.cpp b/ydb/core/kqp/ut/join/kqp_benches_ut.cpp new file mode 100644 index 000000000000..a4f7fb20cef3 --- /dev/null +++ b/ydb/core/kqp/ut/join/kqp_benches_ut.cpp @@ -0,0 +1,115 @@ +#include "kqp_join_topology_generator.h" + +#include +#include + +namespace NKikimr::NKqp { + +Y_UNIT_TEST_SUITE(KqpBenches) { + + void CheckStats(TRunningStatistics& stats, + ui32 n, double min, double max, + double median, double mad, + double q1, double q3, double iqr, + double mean, double stdev) { + + const double TOLERANCE = 0.0001; + + UNIT_ASSERT_EQUAL(stats.GetN(), n); + UNIT_ASSERT_DOUBLES_EQUAL(stats.GetMin(), min, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(stats.GetMax(), max, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(stats.GetMedian(), median, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(stats.CalculateMAD(), mad, TOLERANCE); + + auto statistics = stats.GetStatistics(); + UNIT_ASSERT_EQUAL(statistics.GetN(), n); + UNIT_ASSERT_DOUBLES_EQUAL(statistics.GetMin(), min, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(statistics.GetMax(), max, TOLERANCE); + + auto report = statistics.ComputeStatistics(); + UNIT_ASSERT_EQUAL(report.N, n); + UNIT_ASSERT_DOUBLES_EQUAL(report.Min, min, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(report.Max, max, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(report.Median, median, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(report.MAD, mad, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(report.Q1, q1, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(report.Q3, q3, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(report.IQR, iqr, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(report.Mean, mean, TOLERANCE); + UNIT_ASSERT_DOUBLES_EQUAL(report.Stdev, stdev, TOLERANCE); + } + + Y_UNIT_TEST(TStatistics) { + TRunningStatistics stats; + + stats.AddValue(1); + stats.AddValue(1); + CheckStats(stats, 2, 1, 1, 1, 0, 1, 1, 0, 1, 0); + + stats.AddValue(1); + CheckStats(stats, 3, 1, 1, 1, 0, 1, 1, 0, 1, 0); + + stats.AddValue(1); + CheckStats(stats, 4, 1, 1, 1, 0, 1, 1, 0, 1, 0); + + stats.AddValue(3); + CheckStats(stats, 5, 1, 3, 1, 0, 1, 1, 0, 1.4000, 0.8944); + + stats.AddValue(5); + CheckStats(stats, 6, 1, 5, 1, 0, 1, 2.5000, 1.5000, 2, 1.6733); + + stats.AddValue(7); + CheckStats(stats, 7, 1, 7, 1, 0, 1, 4, 3, 2.7143, 2.4300); + + stats.AddValue(7); + CheckStats(stats, 8, 1, 7, 2, 1, 1, 5.5000, 4.5000, 3.2500, 2.7124); + + stats.AddValue(8); + CheckStats(stats, 9, 1, 8, 3, 2, 1, 7, 6, 3.7778, 2.9907); + + stats.AddValue(100); + CheckStats(stats, 10, 1, 100, 4, 3, 1, 7, 6, 13.4000, 30.5585); + + stats.AddValue(12); + CheckStats(stats, 11, 1, 100, 5, 4, 1, 7.5000, 6.5000, 13.2727, 28.9934); + + stats.AddValue(15); + CheckStats(stats, 12, 1, 100, 6, 5, 1, 9, 8, 13.4167, 27.6486); + + stats.AddValue(11); + CheckStats(stats, 13, 1, 100, 7, 5, 1, 11, 10, 13.2308, 26.4800); + + stats.AddValue(18); + CheckStats(stats, 14, 1, 100, 7, 5.5000, 1.5000, 11.7500, 10.2500, 13.5714, 25.4731); + + stats.AddValue(19); + CheckStats(stats, 15, 1, 100, 7, 6, 2, 13.5000, 11.5000, 13.9333, 24.5865); + + stats.AddValue(14); + CheckStats(stats, 16, 1, 100, 7.5000, 6.5000, 2.5000, 14.2500, 11.7500, 13.9375, 23.7528); + + stats.AddValue(21); + CheckStats(stats, 17, 1, 100, 8, 7, 3, 15, 12, 14.3529, 23.0623); + + stats.AddValue(9); + CheckStats(stats, 18, 1, 100, 8.5000, 6, 3.5000, 14.7500, 11.2500, 14.0556, 22.4092); + + stats.AddValue(25); + CheckStats(stats, 19, 1, 100, 9, 6, 4, 16.5000, 12.5000, 14.6316, 21.9221); + + stats.AddValue(17); + CheckStats(stats, 20, 1, 100, 10, 7, 4.5000, 17.2500, 12.7500, 14.7500, 21.3440); + + stats.AddValue(18); + CheckStats(stats, 21, 1, 100, 11, 7, 5, 18, 13, 14.9048, 20.8156); + + stats.AddValue(10); + CheckStats(stats, 22, 1, 100, 10.5000, 7, 5.5000, 17.7500, 12.2500, 14.6818, 20.3409); + + stats.AddValue(22); + CheckStats(stats, 23, 1, 100, 11, 7, 6, 18, 12, 15, 19.9317); + } + +} // Y_UNIT_TEST_SUITE(KqpBenches) + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/join/kqp_join_topology_generator.cpp b/ydb/core/kqp/ut/join/kqp_join_topology_generator.cpp new file mode 100644 index 000000000000..6ec9eb6d9d0b --- /dev/null +++ b/ydb/core/kqp/ut/join/kqp_join_topology_generator.cpp @@ -0,0 +1,743 @@ +#include "kqp_join_topology_generator.h" + +#include +#include +#include +#include +#include + +namespace NKikimr::NKqp { + +static std::string getTableName(unsigned tableID) { + return TLexicographicalNameGenerator::getName(tableID, /*lowerCase=*/false); +} + +static std::string getColumnName(unsigned tableID, unsigned columnID) { + return TLexicographicalNameGenerator::getName(tableID, /*lowerCase=*/true) + "_" + + TLexicographicalNameGenerator::getName(columnID, /*lowerCase=*/true); +} + +static std::string getRelationName(unsigned tableID, unsigned columnID) { + return getTableName(tableID) + "." + getColumnName(tableID, columnID); +} + +static std::string getTablePath(unsigned tableID) { + return "/Root/" + getTableName(tableID); +} + +static double getRandomNormalizedDouble(TRNG& rng) { + std::uniform_real_distribution<> distribution(0.0, 1.0); + return distribution(rng); +} + +static std::vector GetPitmanYor(TRNG& rng, ui32 sum, TPitmanYorConfig config) { + std::vector keyCounts{/*initial key=*/1}; + + for (ui32 column = 1; column < sum; ++ column) { + double random = getRandomNormalizedDouble(rng); + + double cumulative = 0.0; + int chosenTableIndex = -1; + + for (ui32 i = 0; i < keyCounts.size(); ++ i) { + cumulative += (keyCounts[i] - config.Alpha) / (column + config.Theta); + if (random < cumulative) { + chosenTableIndex = i; + break; + } + } + + if (chosenTableIndex == -1) { + keyCounts.push_back(1); + } else { + ++ keyCounts[chosenTableIndex]; + } + } + + std::sort(keyCounts.begin(), keyCounts.end(), std::greater{}); + return keyCounts; +} + +void TRelationGraph::SetupKeysPitmanYor(TRNG& rng, TPitmanYorConfig config) { + std::vector, /*last index*/ ui32>> distributions(GetN()); + for (ui32 i = 0; i < GetN(); ++ i) { + auto distribution = GetPitmanYor(rng, AdjacencyList_[i].size(), config); + Schema_[i] = TTable{static_cast(distribution.size())}; + distributions[i] = {std::move(distribution), /*initial index=*/0}; + } + + auto GetKey = [&](ui32 node) { + auto& [nodeDistribution, lastIndex] = distributions[node]; + ui32 key = lastIndex; + + Y_ASSERT(lastIndex < nodeDistribution.size()); + Y_ASSERT(nodeDistribution[lastIndex] != 0); + if (-- nodeDistribution[lastIndex] == 0) { + ++ lastIndex; + } + + return key; + }; + + std::set> visited; + + for (ui32 u = 0; u < GetN(); ++ u) { + for (ui32 i = 0; i < AdjacencyList_[u].size(); ++ i) { + TEdge* forwardEdge = &AdjacencyList_[u][i]; + ui32 v = forwardEdge->Target; + + if (visited.contains({v, u}) || visited.contains({u, v})) { + continue; + } + + visited.insert({u, v}); + + // find backward edge + TEdge* backwardEdge = nullptr; + std::vector& targetAdjacency = AdjacencyList_[forwardEdge->Target]; + for (ui32 k = 0; k < targetAdjacency.size(); ++ k) { + if (targetAdjacency[k].Target == u) { + backwardEdge = &targetAdjacency[k]; + } + } + + ui32 ColumnLHS = GetKey(u); + ui32 ColumnRHS = GetKey(v); + + forwardEdge->ColumnLHS = ColumnLHS; + forwardEdge->ColumnRHS = ColumnRHS; + backwardEdge->ColumnLHS = ColumnRHS; + backwardEdge->ColumnRHS = ColumnLHS; + } + } +} + +TSchema TSchema::MakeWithEnoughColumns(unsigned numNodes) { + return TSchema{std::vector(numNodes, TTable{numNodes})}; +} + +std::string TSchema::MakeCreateQuery() const { + std::string prerequisites; + for (unsigned i = 0; i < Tables_.size(); ++ i) { + prerequisites += "CREATE TABLE `" + getTablePath(i) + "` (\n"; + + for (unsigned j = 0; j < Tables_[i].GetNumColumns(); ++ j) { + prerequisites += " " + getColumnName(i, j) + " Int32 NOT NULL,\n"; + } + prerequisites += " PRIMARY KEY (" + getColumnName(i, 0) + ")\n"; + + prerequisites += ") WITH (STORE = COLUMN);\n"; + } + + return prerequisites; +} + +std::string TSchema::MakeDropQuery() const { + std::string query; + for (unsigned i = 0; i < Tables_.size(); ++ i) { + query += "DROP TABLE `" + getTablePath(i) + "`;\n"; + } + + return query; +} + +void TSchema::Rename(std::vector oldToNew) { + std::vector newTables(Tables_.size()); + + for (unsigned i = 0; i < oldToNew.size(); ++i) { + newTables[oldToNew[i]] = Tables_[i]; + } + + Tables_ = newTables; +} + +void TRelationGraph::Connect(unsigned lhs, unsigned rhs) { + AdjacencyList_[lhs].push_back({/*Target=*/rhs, 0, 0}); + AdjacencyList_[rhs].push_back({/*Target=*/lhs, 0, 0}); +} + +void TRelationGraph::Disconnect(unsigned u, unsigned v) { + auto& adjacencyU = AdjacencyList_[u]; + auto& adjacencyV = AdjacencyList_[v]; + adjacencyU.erase(std::remove_if(adjacencyU.begin(), adjacencyU.end(), [v](TEdge edge) { return edge.Target == v; }), adjacencyU.end()); + adjacencyV.erase(std::remove_if(adjacencyV.begin(), adjacencyV.end(), [u](TEdge edge) { return edge.Target == u; }), adjacencyV.end()); +} + +bool TRelationGraph::HasEdge(unsigned u, unsigned v) const { + for (ui32 i = 0; i < AdjacencyList_[u].size(); ++i) { + if (AdjacencyList_[u][i].Target == v) { + return true; + } + } + + return false; +} + +std::vector TRelationGraph::FindComponents() const { + std::vector component(GetN(), -1); + int numComponents = 0; + + for (unsigned start = 0; start < GetN(); ++ start) { + if (component[start] != -1) { + continue; + } + + std::queue queue; + queue.push(start); + component[start] = numComponents; + + while (!queue.empty()) { + unsigned u = queue.front(); + queue.pop(); + for (TEdge edge : AdjacencyList_[u]) { + unsigned v = edge.Target; + if (component[v] == -1) { + component[v] = numComponents; + queue.push(v); + } + } + } + + ++ numComponents; + } + + return component; +} + +std::string TRelationGraph::MakeQuery() const { + std::string fromClause; + std::string joinClause; + for (unsigned i = 0; i < AdjacencyList_.size(); ++ i) { + std::string currentJoin = + "JOIN " + getTableName(i) + " ON"; + + bool hasJoin = false; + auto addJoinCodition = [&](int j) { + if (hasJoin) { + currentJoin += " AND"; + } + + hasJoin = true; + + currentJoin += " " + + getRelationName(i, AdjacencyList_[i][j].ColumnLHS) + " = " + + getRelationName(AdjacencyList_[i][j].Target, AdjacencyList_[i][j].ColumnRHS); + }; + + for (unsigned j = 0; j < AdjacencyList_[i].size(); ++ j) { + if (i < AdjacencyList_[i][j].Target) { + continue; + } + + addJoinCodition(j); + } + currentJoin += "\n"; + + if (hasJoin) { + joinClause += currentJoin; + } else if (fromClause.empty()) { + fromClause = "SELECT *\nFROM " + getTableName(i) + "\n"; + } else { + joinClause += "CROSS JOIN " + getTableName(i) + "\n"; + } + } + + // remove extra '\n' from last join + if (!joinClause.empty()) { + Y_ASSERT(joinClause.back() == '\n'); + joinClause.pop_back(); + } + + return std::move(fromClause) + std::move(joinClause) + ";\n"; +} + +void TRelationGraph::DumpGraph(IOutputStream& os) const { + os << "graph {\n"; + for (unsigned i = 0; i < AdjacencyList_.size(); ++ i) { + for (unsigned j = 0; j < AdjacencyList_[i].size(); ++ j) { + const TEdge& edge = AdjacencyList_[i][j]; + if (i <= edge.Target) { + os << " " << getTableName(i) << " -- " << getTableName(edge.Target) + << " [label = \"" + << getRelationName(i, edge.ColumnLHS) << " = " + << getRelationName(edge.Target, edge.ColumnRHS) + << "\"];\n"; + } + } + } + + os << "}\n"; +} + +std::vector TRelationGraph::GetDegrees() const { + std::vector degrees(AdjacencyList_.size()); + for (unsigned i = 0; i < AdjacencyList_.size(); ++i) { + degrees[i] = AdjacencyList_[i].size(); + } + + std::sort(degrees.begin(), degrees.end()); + return degrees; +} + +void TRelationGraph::ReorderDFS() { + std::vector visited(GetN(), false); + std::vector newOrder; + newOrder.reserve(GetN()); + + auto searchDepthFirst = [&](auto&& self, unsigned node) { + if (visited[node]) { + return; + } + + visited[node] = true; + newOrder.push_back(node); + + for (TEdge edge : AdjacencyList_[node]) { + self(self, edge.Target); + } + }; + + for (unsigned i = 0; i < GetN(); i++) { + searchDepthFirst(searchDepthFirst, i); + } + + std::vector oldToNew(GetN()); + for (unsigned i = 0; i < GetN(); i++) { + oldToNew[newOrder[i]] = i; + } + + Rename(oldToNew); + Schema_.Rename(oldToNew); +} + +void TRelationGraph::Rename(const std::vector& oldToNew) { + TAdjacencyList newGraph(GetN()); + for (unsigned u = 0; u < GetN(); ++u) { + for (TEdge edge : AdjacencyList_[u]) { + unsigned v = edge.Target; + + edge.Target = oldToNew[v]; + newGraph[oldToNew[u]].push_back(edge); + } + } + + AdjacencyList_ = newGraph; +} + +ui32 TRelationGraph::GetNumEdges() const { + ui32 numEdges = 0; + for (auto edges : AdjacencyList_) { + numEdges += edges.size(); + } + + return numEdges; +} + +int TRelationGraph::GetNumComponents() const { + auto comp = FindComponents(); + return comp.empty() ? 0 : *std::max_element(comp.begin(), comp.end()) + 1; +} + +TSchemaStats TSchemaStats::MakeRandom(TRNG& rng, const TSchema& schema, unsigned a, unsigned b) { + std::uniform_int_distribution<> distribution(a, b); + std::vector stats(schema.GetSize()); + + for (unsigned i = 0; i < schema.GetSize(); ++i) { + unsigned RowSize = std::pow(10, distribution(rng)); + unsigned ByteSize = RowSize * 64; + stats[i] = {ByteSize, RowSize}; + } + + return TSchemaStats{stats}; +} + +std::string TSchemaStats::ToJSON() const { + std::stringstream ss; + + ss << "{"; + for (unsigned i = 0; i < Stats_.size(); ++i) { + if (i != 0) { + ss << ","; + } + + ss << "\"" << getTablePath(i) << "\": "; + ss << "{"; + ss << "\"" << "n_rows" << "\": " << Stats_[i].RowSize << ", "; + ss << "\"" << "byte_size" << "\": " << Stats_[i].ByteSize; + ss << "}"; + } + ss << "}"; + + return ss.str(); +} + +TRelationGraph GeneratePath(unsigned numNodes) { + TRelationGraph graph(numNodes); + + unsigned lastVertex = 0; + bool first = true; + for (unsigned i = 0; i < numNodes; ++i) { + if (!first) { + graph.Connect(lastVertex, i); + } + + first = false; + lastVertex = i; + } + + return graph; +} + +TRelationGraph GenerateStar(unsigned numNodes) { + TRelationGraph graph(numNodes); + + unsigned root = 0; + for (unsigned i = 1; i < numNodes; ++i) { + graph.Connect(root, i); + } + + return graph; +} + +TRelationGraph GenerateClique(unsigned numNodes) { + TRelationGraph graph(numNodes); + + for (unsigned i = 0; i < numNodes; ++i) { + for (unsigned j = i + 1; j < numNodes; ++j) { + graph.Connect(i, j); + } + } + + return graph; +} + +TRelationGraph GenerateTreeFromPruferSequence(const std::vector& prufer) { + unsigned n = prufer.size() + 2; + + std::vector degree(n, 1); + for (unsigned i : prufer) { + ++ degree[i]; + } + + TRelationGraph graph(n); + + for (unsigned u : prufer) { + for (unsigned v = 0; v < n; ++ v) { + if (degree[v] == 1) { + graph.Connect(u, v); + + -- degree[v]; + -- degree[u]; + break; + } + } + } + + int u = -1; + unsigned v = 0; + for (; v < n; ++ v) { + if (degree[v] == 1) { + if (u != -1) { + graph.Connect(u, v); + break; + } + + u = v; + } + } + + return graph; +} + + +static std::vector GenerateRandomPruferSequence(TRNG& rng, unsigned numNodes) { + Y_ASSERT(numNodes >= 2); + std::uniform_int_distribution<> distribution(0, numNodes - 1); + + std::vector prufer(numNodes - 2); + for (unsigned i = 0; i < numNodes - 2; ++i) { + prufer[i] = distribution(rng); + } + + return prufer; +} + +TRelationGraph GenerateRandomTree(TRNG& rng, unsigned numNodes) { + auto prufer = GenerateRandomPruferSequence(rng, numNodes); + return GenerateTreeFromPruferSequence(prufer); +} + +static void NormalizeProbabilities(std::vector& probabilities) { + double sum = std::accumulate(probabilities.begin(), probabilities.end(), 0.0); + if (sum > 0.0) { + for (double& probability : probabilities) { + probability /= sum; + } + } +} + +std::vector SampleFromPMF(TRNG& rng, const std::vector& probabilities, int numVertices, int minDegree) { + std::discrete_distribution distribution(probabilities.begin(), probabilities.end()); + + std::vector degrees(numVertices); + for (int i = 0; i < numVertices; i++) { + degrees[i] = distribution(rng) + minDegree; + } + return degrees; +} + +std::vector GenerateLogNormalDegrees(TRNG& rng, int numVertices, double mu, double sigma, int minDegree, int maxDegree) { + if (maxDegree == -1) { + maxDegree = numVertices - 1; + } + + std::vector probabilities(maxDegree - minDegree + 1); + for (int k = minDegree; k <= maxDegree; k++) { + if (k <= 0) { + probabilities[k - minDegree] = 0.0; + continue; + } + + double x = (double)k; + double logX = std::log(x); + double z = (logX - mu) / sigma; + + // PDF: (1/(x*σ*√(2π))) * exp(-(ln(x)-μ)²/(2σ²)) + probabilities[k - minDegree] = (1.0 / (x * sigma * std::sqrt(2.0 * M_PI))) * std::exp(-0.5 * z * z); + } + + // Not strictly necessary, but makes it easier to inspect probabilities + NormalizeProbabilities(probabilities); + return SampleFromPMF(rng, probabilities, numVertices, minDegree); +} + +TRelationGraph GenerateRandomChungLuGraph(TRNG& rng, const std::vector& degrees) { + TRelationGraph graph(degrees.size()); + + double sum = std::accumulate(degrees.begin(), degrees.end(), 0.0); + if (sum == 0) { + return graph; + } + + std::uniform_real_distribution<> distribution(0, 1); + for (ui32 i = 0; i < degrees.size(); ++i) { + for (ui32 j = i + 1; j < degrees.size(); ++j) { + if (distribution(rng) < std::min(1.0, degrees[i] * degrees[j] / sum)) { + graph.Connect(i, j); + } + } + } + + return graph; +} + +static void MakeEvenSum(std::vector& degrees) { + int sum = std::accumulate(degrees.begin(), degrees.end(), 0); + if (sum % 2 == 1) { + auto minIt = std::min_element(degrees.begin(), degrees.end()); + ++*minIt; + } +} + +static bool CheckSatisfiesErdosGallai(std::vector degrees) { + int sum = std::accumulate(degrees.begin(), degrees.end(), 0); + + if (sum % 2 != 0) { + return false; + } + + for (int degree : degrees) { + if (degree < 0 || degree >= static_cast(degrees.size())) { + return false; + } + } + + std::sort(degrees.rbegin(), degrees.rend()); + + uint64_t sumLeft = 0; + for (uint64_t k = 0; k < degrees.size(); ++k) { + sumLeft += degrees[k]; + + uint64_t sumRight = k * (k + 1); + for (uint64_t i = k + 1; i < degrees.size(); ++i) { + sumRight += std::min(k + 1, degrees[i]); + } + + if (sumLeft > sumRight) { + return false; + } + } + + return true; +} + +bool CanBeConnected(const std::vector& degrees) { + int n = degrees.size(); + if (n <= 1) { + return true; + } + + int sum = std::accumulate(degrees.begin(), degrees.end(), 0); + if (sum < 2 * (n - 1)) { + return false; + } + + for (int degree : degrees) { + if (degree == 0) { + return false; + } + } + + return true; +} + +std::vector MakeGraphicConnected(std::vector degrees) { + const i32 MAX_ITERATIONS = 1000; + + for (int& degree : degrees) { + if (degree == 0) { + degree = 1; + } + } + + // Cap degrees at n-1 + for (int& degree : degrees) { + degree = std::min(degree, degrees.size() - 1); + } + + MakeEvenSum(degrees); + + int iterations = 0; + while ((!CheckSatisfiesErdosGallai(degrees) || !CanBeConnected(degrees)) && iterations++ < MAX_ITERATIONS) { + std::sort(degrees.begin(), degrees.end(), std::greater()); + + if (!CheckSatisfiesErdosGallai(degrees)) { + // Reduce max, increase min (redistribute) + if (degrees[0] > degrees[degrees.size() - 1] + 1) { + --degrees[0]; + ++degrees[degrees.size() - 1]; + } else { + --degrees[0]; + } + } else if (!CanBeConnected(degrees)) { + // Increase minimum degrees to help connectivity + for (int& degree : degrees) { + ui32 edges = std::accumulate(degrees.begin(), degrees.end(), 0) / 2; + if (degree < 2 && edges + 2 <= degrees.size() * (degrees.size() - 1) / 2) { + ++degree; + } + } + } + + MakeEvenSum(degrees); + } + + return degrees; +} + +TRelationGraph ConstructGraphHavelHakimi(std::vector degrees) { + TRelationGraph graph(degrees.size()); + + std::vector> nodes; + for (uint32_t i = 0; i < degrees.size(); ++i) { + nodes.push_back({degrees[i], i}); + } + + while (true) { + std::sort(nodes.begin(), nodes.end(), std::greater>{}); + + while (!nodes.empty() && nodes.back().first == 0) { + nodes.pop_back(); + } + + if (nodes.empty()) { + break; + } + + auto [degree, u] = nodes[0]; + nodes.erase(nodes.begin()); + + if (degree > static_cast(nodes.size())) { + break; + } + + for (uint32_t i = 0; i < static_cast(degree); ++i) { + uint32_t v = nodes[i].second; + graph.Connect(u, v); + --nodes[i].first; + } + } + + return graph; +} + +void MCMCRandomize(TRNG& rng, TRelationGraph& graph) { + std::uniform_int_distribution<> nodeDist(0, graph.GetN() - 1); + std::uniform_real_distribution<> distribution(0.0, 1.0); + + ui32 numEdges = graph.GetNumEdges(); + ui32 numSwaps = numEdges * log(numEdges); + + auto& adjacency = graph.GetAdjacencyList(); + + const double TEMP_START = 5.0; + const double TEMP_END = 0.1; + const double CONNECTIVITY_PENALTY = 20.0; + const int MAX_ATTEMPTS = numSwaps * 10; + + ui32 successfulSwaps = 0; + int attempt = 0; + + while (attempt < MAX_ATTEMPTS && (successfulSwaps < numSwaps || !graph.IsConnected())) { + double progress = std::min(1.0, static_cast(attempt) / MAX_ATTEMPTS); + double temperature = TEMP_START * std::pow(TEMP_END / TEMP_START, progress); + + ++attempt; + + int a = nodeDist(rng); + if (adjacency[a].empty()) { + continue; + } + + int bIdx = std::uniform_int_distribution<>(0, adjacency[a].size() - 1)(rng); + int b = adjacency[a][bIdx].Target; + + int c = nodeDist(rng); + if (c == a || c == b || adjacency[c].empty()) { + continue; + } + + int dIdx = std::uniform_int_distribution<>(0, adjacency[c].size() - 1)(rng); + int d = adjacency[c][dIdx].Target; + + if (d == a || d == b || d == c) { + continue; + } + if (graph.HasEdge(a, c) || graph.HasEdge(b, d)) { + continue; + } + + int oldComponents = graph.GetNumComponents(); + + graph.Disconnect(a, b); + graph.Disconnect(c, d); + graph.Connect(a, c); + graph.Connect(b, d); + + int newComponents = graph.GetNumComponents(); + double deltaEnergy = CONNECTIVITY_PENALTY * (newComponents - oldComponents); + + bool accept = distribution(rng) < std::exp(-deltaEnergy / temperature); + + if (accept) { + ++ successfulSwaps; + } else { + graph.Disconnect(a, c); + graph.Disconnect(b, d); + graph.Connect(a, b); + graph.Connect(c, d); + } + } +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/join/kqp_join_topology_generator.h b/ydb/core/kqp/ut/join/kqp_join_topology_generator.h new file mode 100644 index 000000000000..1b7f24b79805 --- /dev/null +++ b/ydb/core/kqp/ut/join/kqp_join_topology_generator.h @@ -0,0 +1,244 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include + +namespace NKikimr::NKqp { + +using TRNG = TSerializableMT19937; + +class TLexicographicalNameGenerator { +public: + static std::string getName(unsigned id, bool lowerCase = true) { + if (id < Base_) { + return std::string(1, fromDigit(id, lowerCase)); + } + + id -= Base_; + + unsigned count = 1; + unsigned step = Base_; + for (; id >= step;) { + id -= step; + step *= step; + count *= 2; + } + + std::string result(count, fromDigit(Base_ - 1, lowerCase)); + return result + fromNumber(id, result.size(), lowerCase); + } + +private: + static std::string fromNumber(unsigned number, unsigned size, bool lowerCase) { + std::string stringified = ""; + for (unsigned i = 0; i < size; ++ i) { + stringified.push_back(fromDigit(number % Base_, lowerCase)); + number /= Base_; + } + + return std::string(stringified.rbegin(), stringified.rend()); + } + + static char fromDigit(unsigned value, bool lowerCase) { + Y_ASSERT(0 <= value && value < Base_); + return (lowerCase ? 'a' : 'A') + value; + } + + static constexpr unsigned Base_ = 'z' - 'a' + 1; +}; + +struct TPitmanYorConfig { + double Alpha; + double Theta; + + void DumpParamsHeader(IOutputStream& os) { + os << "alpha,theta"; + } + + void DumpParams(IOutputStream& os) { + os << Alpha << "," << Theta; + } +}; + +class TTable { +public: + TTable(unsigned numColumns = 0) + : NumColumns_(numColumns) + { + } + + unsigned GetNumColumns() const { + return NumColumns_; + } + +private: + unsigned NumColumns_; +}; + +class TSchema { +public: + TSchema(unsigned numNodes) + : Tables_(numNodes) + { + } + + TSchema(std::vector tables) + : Tables_(std::move(tables)) + { + } + + static TSchema MakeWithEnoughColumns(unsigned numNodes); + + std::string MakeCreateQuery() const; + std::string MakeDropQuery() const; + + TTable& operator[](unsigned index) { + return Tables_[index]; + } + + size_t GetSize() const { + return Tables_.size(); + } + + void Rename(std::vector oldToNew); + +private: + std::vector Tables_; +}; + +class TRelationGraph { +public: + TRelationGraph(unsigned numNodes) + : AdjacencyList_(numNodes) + , Schema_(numNodes) + { + } + + void Connect(unsigned lhs, unsigned rhs); + void Disconnect(unsigned u, unsigned v); + bool HasEdge(unsigned u, unsigned v) const; + + std::string MakeQuery() const; + + ui32 GetNumEdges() const; + unsigned GetN() const { + return AdjacencyList_.size(); + } + + std::vector FindComponents() const; + int GetNumComponents() const; + bool IsConnected() const { + return GetNumComponents() == 1; + } + + const TSchema& GetSchema() const { + return Schema_; + } + + std::vector GetDegrees() const; + + // Reorder in connected order, meaning that first N vertices form + // a connected subgraph if the whole graph is connected. This is used + // to ensure that each JOIN clause only mentions tables that where + // already joined (or FROM clause) + void ReorderDFS(); + + // Update vertex numbering accroding to oldToNew map, primarily + // used to reorder graph in connected subgraphs-first order. + void Rename(const std::vector& oldToNew); + + // Update keys for the whole graph accroding to Pitman-Yor distribution, + // where degree is distributed into clusters and each cluster means + // that that number of edges joins this particular node with the same key + void SetupKeysPitmanYor(TRNG& rng, TPitmanYorConfig config); + + // Dump graph in undirected graphviz dot format. Neato is recommended + // for layouting such graphs. + void DumpGraph(IOutputStream& os) const; + +public: + struct TEdge { + unsigned Target; + unsigned ColumnLHS, ColumnRHS; + }; + + using TAdjacencyList = std::vector>; + +public: + TAdjacencyList& GetAdjacencyList() { + return AdjacencyList_; + } + +private: + TAdjacencyList AdjacencyList_; + TSchema Schema_; +}; + +class TSchemaStats { +public: + struct TTableStats { + unsigned ByteSize; + unsigned RowSize; + }; + +public: + TSchemaStats(std::vector stats) + : Stats_(std::move(stats)) + { + } + + static TSchemaStats MakeRandom(TRNG& rng, const TSchema& schema, unsigned a, unsigned b); + + std::string ToJSON() const; + +private: + std::vector Stats_; +}; + +// Basic topologies, this all have fixed node layouts (not random) +TRelationGraph GeneratePath(unsigned numNodes); +TRelationGraph GenerateStar(unsigned numNodes); +TRelationGraph GenerateClique(unsigned numNodes); + +// Generate a tree from Prufer sequence (each labeled tree has a +// corresponding unique sequence) +TRelationGraph GenerateTreeFromPruferSequence(const std::vector& prufer); + +// Uniformly random trees based on random Prufer sequence +TRelationGraph GenerateRandomTree(TRNG& rng, unsigned numNodes); + +// Random graph using Chung Lu model that approximates graph with given degrees +TRelationGraph GenerateRandomChungLuGraph(TRNG& rng, const std::vector& degrees); + +// Sample a degree sequence from a given probability distribution +std::vector SampleFromPMF( + TRNG& rng, + const std::vector& probabilities, + int numVertices, int minDegree); + +// Sample a degree sequence from lognormal distribution +std::vector GenerateLogNormalDegrees( + TRNG& rng, int numVertices, + double mu = 1.0, double sigma = 0.5, + int minDegree = 1, int maxDegree = -1); + +// Adjust degree sequence to make it graphic (realizable by simple graph +// without self-loops and double edges) and check that it's likely possible +// to make a connected graph with that degree sequence +// (athough this property is not guaranteed) +std::vector MakeGraphicConnected(std::vector degrees); + +// Deterministically constructs graph for a given degree sequence +TRelationGraph ConstructGraphHavelHakimi(std::vector degrees); + +// Randomize graph using ~E*log(E) l-switches (preserve degrees of all +// verticies) Uses Metropolis-Hastings based acceptance with annealing (to make +// switching edges ergodic and still produce connected graphs) +void MCMCRandomize(TRNG& rng, TRelationGraph& graph); + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/join/kqp_join_topology_ut.cpp b/ydb/core/kqp/ut/join/kqp_join_topology_ut.cpp new file mode 100644 index 000000000000..28a36fc53715 --- /dev/null +++ b/ydb/core/kqp/ut/join/kqp_join_topology_ut.cpp @@ -0,0 +1,603 @@ +#include "kqp_join_topology_generator.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NKqp { + +using namespace NYdb; +using namespace NYdb::NTable; + +Y_UNIT_TEST_SUITE(KqpJoinTopology) { + + std::optional ExplainQuery(NYdb::NQuery::TSession session, const std::string& query) { + auto explainRes = session.ExecuteQuery(query, + NYdb::NQuery::TTxControl::NoTx(), + NYdb::NQuery::TExecuteQuerySettings().ExecMode(NQuery::EExecMode::Explain) + ).ExtractValueSync(); + + if (explainRes.GetStatus() == EStatus::TIMEOUT) { + return std::nullopt; + } + + explainRes.GetIssues().PrintTo(Cout); + if (explainRes.GetStatus() != EStatus::SUCCESS) { + throw std::runtime_error("Couldn't execute query!"); + } + + return *explainRes.GetStats()->GetPlan(); + } + + bool ExecuteQuery(NYdb::NQuery::TSession session, std::string query) { + auto execRes = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + + if (execRes.GetStatus() == EStatus::TIMEOUT) { + return false; + } + + execRes.GetIssues().PrintTo(Cout); + UNIT_ASSERT(execRes.IsSuccess()); + + return true; + } + + void JustPrintPlan(const TString& plan) { + NYdb::NConsoleClient::TQueryPlanPrinter queryPlanPrinter( + NYdb::NConsoleClient::EDataFormat::PrettyTable, + /*analyzeMode=*/true, Cout, /*maxWidth=*/0); + + queryPlanPrinter.Print(plan); + } + + std::string ConfigureQuery(const std::string& query, bool enableShuffleElimination = false, unsigned optLevel = 2) { + std::string queryWithShuffleElimination = "PRAGMA ydb.OptShuffleElimination=\""; + queryWithShuffleElimination += enableShuffleElimination ? "true" : "false"; + queryWithShuffleElimination += "\";\n"; + queryWithShuffleElimination += "PRAGMA ydb.MaxDPHypDPTableSize='4294967295';\n"; + queryWithShuffleElimination += "PRAGMA ydb.ShuffleEliminationJoinNumCutoff='" + std::to_string(UINT32_MAX) + "';\n"; + queryWithShuffleElimination += "PRAGMA ydb.CostBasedOptimizationLevel=\"" + std::to_string(optLevel) + "\";\n"; + queryWithShuffleElimination += query; + + return queryWithShuffleElimination; + } + + std::optional BenchmarkExplain(TBenchmarkConfig config, NYdb::NQuery::TSession session, const TString& query) { + std::optional savedPlan = std::nullopt; + std::optional stats = Benchmark(config, [&]() -> bool { + auto plan = ExplainQuery(session, query); + if (!savedPlan) { + savedPlan = plan; + } + + return !!plan; + }); + + if (!stats) { + Cout << "-------------------------------- TIMED OUT -------------------------------\n"; + return std::nullopt; + } + + Y_ASSERT(savedPlan); + JustPrintPlan(*savedPlan); + Cout << "--------------------------------------------------------------------------\n"; + + DumpTimeStatistics(stats->ComputeStatistics(), Cout); + + return stats; + } + + std::optional> BenchmarkShuffleElimination(TBenchmarkConfig config, NYdb::NQuery::TSession session, std::string resultType, const TString& query) { + std::map results; + + std::optional withoutCBO; + if (resultType.contains("0")) { + Cout << "--------------------------------- W/O CBO --------------------------------\n"; + + withoutCBO = BenchmarkExplain(config, session, ConfigureQuery(query, /*enableShuffleElimination=*/false, /*optLevel=*/0)); + if (!withoutCBO) { + return std::nullopt; + } + + results.emplace("0", *withoutCBO); + if (withoutCBO->GetMax() > config.SingleRunTimeout || resultType == "0") { + return results; + } + } + + std::optional withoutShuffleElimination; + if (resultType.contains("CBO")) { + Cout << "--------------------------------- CBO-SE ---------------------------------\n"; + withoutShuffleElimination = BenchmarkExplain(config, session, ConfigureQuery(query, /*enableShuffleElimination=*/false, /*optLevel=*/2)); + + if (resultType.contains("0")) { + results.emplace("CBO-0", (*withoutShuffleElimination - *withoutCBO).Filter([](double value) { return value > 0; })); + } + + if (!withoutShuffleElimination) { + return std::nullopt; + } + + results.emplace("CBO", *withoutShuffleElimination); + if (withoutShuffleElimination->GetMax() > config.SingleRunTimeout || resultType == "CBO" || resultType == "CBO-0") { + return results; + } + } + + std::optional withShuffleElimination; + if (resultType.contains("SE")) { + Cout << "--------------------------------- CBO+SE ---------------------------------\n"; + withShuffleElimination = BenchmarkExplain(config, session, ConfigureQuery(query, /*enableShuffleElimination=*/true, /*optLevel=*/2)); + + if (resultType.contains("0")) { + results.emplace("SE-0", (*withShuffleElimination - *withoutCBO).Filter([](double value) { return value > 0; })); + } + + if (!withShuffleElimination) { + return std::nullopt; + } + + results.emplace("SE", *withShuffleElimination); + if (withShuffleElimination->GetMax() > config.SingleRunTimeout || resultType == "SE" || resultType == "SE-0") { + return results; + } + } + + Cout << "--------------------------------------------------------------------------\n"; + + results.emplace("SE-div-CBO", *withShuffleElimination / *withoutShuffleElimination); + if (resultType.contains("0")) { + auto& adjustedWithTime = results.at("SE-0"); + auto& adjustedWithoutTime = results.at("CBO-0"); + + if (adjustedWithTime.ComputeStatistics().Median > adjustedWithoutTime.ComputeStatistics().Median) { + auto adjustedRatio = adjustedWithTime / adjustedWithoutTime; + results.emplace("SE-0-div-CBO-0", adjustedRatio); + } + } + + return results; + } + + std::unique_ptr GetCBOTestsYDB(TString stats, TDuration compilationTimeout) { + TVector settings; + + Y_ASSERT(!stats.empty()); + + NKikimrKqp::TKqpSetting setting; + setting.SetName("OptOverrideStatistics"); + setting.SetValue(stats); + settings.push_back(setting); + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableConstantFolding(true); + appConfig.MutableTableServiceConfig()->SetEnableOrderOptimizaionFSM(true); + appConfig.MutableTableServiceConfig()->SetCompileTimeoutMs(compilationTimeout.MilliSeconds()); + + TKikimrSettings serverSettings(appConfig); + serverSettings.SetWithSampleTables(false); + serverSettings.SetKqpSettings(settings); + + return std::make_unique(serverSettings); + } + + std::optional> + BenchmarkShuffleEliminationOnTopology(TBenchmarkConfig config, NYdb::NQuery::TSession session, std::string resultType, TRelationGraph graph) { + Cout << "================================= GRAPH ==================================\n"; + graph.DumpGraph(Cout); + + Cout << "================================= PREPARE ================================\n"; + auto creationQuery = graph.GetSchema().MakeCreateQuery(); + Cout << creationQuery; + if (!ExecuteQuery(session, creationQuery)) { + return std::nullopt; + } + + Y_SCOPE_EXIT(&) { + Cout << "================================= FINALIZE ===============================\n"; + auto deletionQuery = graph.GetSchema().MakeDropQuery(); + Cout << deletionQuery; + + bool deletionSucceeded = ExecuteQuery(session, deletionQuery); + UNIT_ASSERT_C(deletionSucceeded, "Table deletion timeouted, can't proceed!"); + Cout << "==========================================================================\n"; + }; + + Cout << "================================= BENCHMARK ==============================\n"; + TString query = graph.MakeQuery(); + Cout << query; + + return BenchmarkShuffleElimination(config, session, resultType, query); + } + + template + void OverrideWithArg(std::string key, TArgs args, auto& value) { + if (args.HasArg(key)) { + value = args.GetArg(key).GetValue(); + } + } + + void OverrideRepeatedTestConfig(std::string prefix, TArgs args, TRepeatedTestConfig& config) { + OverrideWithArg(prefix + ".MinRepeats", args, config.MinRepeats); + OverrideWithArg(prefix + ".MaxRepeats", args, config.MaxRepeats); + OverrideWithArg(prefix + ".Timeout", args, config.Timeout); + } + + TBenchmarkConfig GetBenchmarkConfig(TArgs args, std::string prefix = "config") { + TBenchmarkConfig config = /*default=*/{ + .Warmup = { + .MinRepeats = 1, + .MaxRepeats = 5, + .Timeout = 1'000'000'000, + }, + + .Bench = { + .MinRepeats = 10, + .MaxRepeats = 30, + .Timeout = 10'000'000'000, + }, + + .SingleRunTimeout = 20'000'000'000, + .MADThreshold = 0.05 + }; + + OverrideRepeatedTestConfig(prefix + ".Warmup", args, config.Warmup); + OverrideRepeatedTestConfig(prefix + ".Bench", args, config.Bench); + OverrideWithArg(prefix + ".MADThreshold", args, config.MADThreshold); + OverrideWithArg(prefix + ".SingleRunTimeout", args, config.SingleRunTimeout); + + return config; + } + + void DumpBenchmarkConfig(IOutputStream& os, TBenchmarkConfig config) { + os << "config = {\n"; + os << " .Warmup = {\n"; + os << " .MinRepeats = " << config.Warmup.MinRepeats << ",\n"; + os << " .MaxRepeats = " << config.Warmup.MaxRepeats << ",\n"; + os << " .Timeout = " << TimeFormatter::Format(config.Warmup.Timeout) << "\n"; + os << " },\n\n"; + os << " .Bench = {\n"; + os << " .MinRepeats = " << config.Bench.MinRepeats << ",\n"; + os << " .MaxRepeats = " << config.Bench.MaxRepeats << ",\n"; + os << " .Timeout = " << TimeFormatter::Format(config.Bench.Timeout) << "\n"; + os << " },\n\n"; + os << " .SingleRunTimeout = " << TimeFormatter::Format(config.SingleRunTimeout) << ",\n"; + os << " .MADThreshold = " << config.MADThreshold << "\n"; + os << "}\n"; + } + + TPitmanYorConfig GetPitmanYorConfig(TArgs args) { + return TPitmanYorConfig{ + .Alpha = args.GetArgOrDefault("alpha", "0.5").GetValue(), + .Theta = args.GetArgOrDefault("theta", "1.0").GetValue() + }; + } + + struct TBenchState { + ui32 Seed; + ui32 TopologyCounter; + ui32 MCMCCounter; + ui32 KeyCounter; + + std::string toHex() const { + std::stringstream ss; + ss << std::hex << std::setfill('0'); + ss << std::setw(8) << Seed + << std::setw(8) << TopologyCounter + << std::setw(8) << MCMCCounter + << std::setw(8) << KeyCounter; + return ss.str(); + } + + static TBenchState fromHex(const std::string& hex) { + TBenchState state; + ui32* parts[] = {&state.Seed, &state.TopologyCounter, &state.MCMCCounter, &state.KeyCounter}; + for (ui32 i = 0; i < Y_ARRAY_SIZE(parts); ++ i) { + *parts[i] = std::stoul(hex.substr(i * 8, 8), nullptr, 16); + } + + return state; + } + }; + + struct TTestContext { + std::unique_ptr Runner; + NYdb::NQuery::TQueryClient QueryClient; + NYdb::NQuery::TSession Session; + + std::optional State; + TRNG RNG; + + std::string OutputDir; + std::map Streams = {}; + }; + + TTestContext CreateTestContext(TArgs args, std::string outputDir = "") { + std::random_device randomDevice; + TRNG rng(randomDevice() % UINT32_MAX); + + std::optional state; + if (args.HasArg("state")) { + state = TBenchState::fromHex(args.GetString("state")); + rng.seed(state->Seed); + } + + auto numTablesRanged = args.GetArg("N"); + + TSchema fullSchema = TSchema::MakeWithEnoughColumns(numTablesRanged.GetLast()); + TString stats = TSchemaStats::MakeRandom(rng, fullSchema, 7, 10).ToJSON(); + + auto kikimr = GetCBOTestsYDB(stats, TDuration::Seconds(10)); + auto db = kikimr->GetQueryClient(); + auto session = db.GetSession().GetValueSync().GetSession(); + + return {std::move(kikimr), std::move(db), std::move(session), state, std::move(rng), outputDir}; + } + + std::string WriteGraph(TTestContext& ctx, ui32 graphID, const TRelationGraph& graph) { + if (ctx.OutputDir.empty()) { + return ""; + } + + std::string graphDir = ctx.OutputDir + "/graphs"; + if (!std::filesystem::exists(graphDir)) { + std::filesystem::create_directories(graphDir); + } + + std::string graphName = TLexicographicalNameGenerator::getName(graphID); + + TString filename = graphDir + "/" + graphName + ".dot"; + auto os = TUnbufferedFileOutput(filename); + graph.DumpGraph(os); + + return graphName; + } + + void WriteAllStats(TTestContext& ctx, const std::string& prefix, + const std::string& header, const std::string& params, + const std::map& stats) { + if (ctx.OutputDir.empty()) { + return; + } + + if (!std::filesystem::exists(ctx.OutputDir)) { + std::filesystem::create_directories(ctx.OutputDir); + } + + for (const auto& [key, stat] : stats) { + std::string name = prefix + key; + + if (!ctx.Streams.contains(name)) { + auto filename = TString(ctx.OutputDir + "/" + name + ".csv"); + auto& os = ctx.Streams.emplace(name, TUnbufferedFileOutput(filename)).first->second; + os << header << "\n"; + } + + auto& os = ctx.Streams.find(name)->second; + + os << params << ","; + stat.ComputeStatistics().ToCSV(os); + os << "\n"; + } + } + + void AccumulateAllStats(std::map& cummulative, + const std::map& stats) { + for (auto& [key, stat] : stats) { + if (!cummulative.contains(key)) { + cummulative.emplace(key, stat); + } + + cummulative.find(key)->second.Merge(stat); + } + } + + using TTopologyFunction = TRelationGraph (*)(TRNG& rng, ui32 n, double mu, double sigma); + template + TTopologyFunction GetTrivialTopology() { + return []([[maybe_unused]] TRNG& rng, ui32 n, [[maybe_unused]] double mu, [[maybe_unused]] double sigma) { + return TTrivialTopologyGenerator(n); + }; + } + + TTopologyFunction GetTopology(std::string topologyName) { + if (topologyName == "star") { + return GetTrivialTopology(); + } + + if (topologyName == "path") { + return GetTrivialTopology(); + } + + if (topologyName == "clique") { + return GetTrivialTopology(); + } + + if (topologyName == "random-tree") { + return []([[maybe_unused]] TRNG& rng, ui32 n, [[maybe_unused]] double mu, [[maybe_unused]] double sigma) { + return GenerateRandomTree(rng, n); + }; + } + + if (topologyName == "mcmc") { + return []([[maybe_unused]] TRNG& rng, ui32 n, [[maybe_unused]] double mu, [[maybe_unused]] double sigma) { + Cout << "================================= METRICS ================================\n"; + auto sampledDegrees = GenerateLogNormalDegrees(rng, n, mu, sigma); + Cout << "sampled degrees: " << JoinSeq(", ", sampledDegrees) << "\n"; + + auto graphicDegrees = MakeGraphicConnected(sampledDegrees); + Cout << "graphic degrees: " << JoinSeq(", ", graphicDegrees) << "\n"; + + auto initialGraph = ConstructGraphHavelHakimi(graphicDegrees); + return initialGraph; + }; + } + + if (topologyName == "chung-lu") { + return []([[maybe_unused]] TRNG& rng, ui32 n, [[maybe_unused]] double mu, [[maybe_unused]] double sigma) { + Cout << "================================= METRICS ================================\n"; + auto initialDegrees = GenerateLogNormalDegrees(rng, n, mu, sigma); + Cout << "initial degrees: " << JoinSeq(", ", initialDegrees) << "\n"; + + auto initialGraph = GenerateRandomChungLuGraph(rng, initialDegrees); + return initialGraph; + }; + } + + throw std::runtime_error("Unknown topology: '" + topologyName + "'"); + } + + void RunBenches(TTestContext& ctx, TBenchmarkConfig config, TArgs args) { + std::string resultType = args.GetStringOrDefault("result", "SE"); + + ui64 topologyGenerationRepeats = args.GetArgOrDefault("gen-n", "1").GetValue(); + ui64 mcmcRepeats = args.GetArgOrDefault("mcmc-n", "1").GetValue(); + ui64 equiJoinKeysGenerationRepeats = args.GetArgOrDefault("keys-n", "1").GetValue(); + bool reorder = args.GetArgOrDefault("reorder", "1").GetValue() != 0; + + std::string topologyName = args.GetStringOrDefault("type", "star"); + auto generateTopology = GetTopology(topologyName); + + std::string headerAggregate = "idx,N,alpha,theta,sigma,mu," + TComputedStatistics::GetCSVHeader(); + std::string header = "idx,state,graph_name,aggregate_" + headerAggregate; + + ui32 idx = 0; + ui32 aggregateIdx = 0; + + for (double alpha : args.GetArgOrDefault("alpha", "0.5")) { + for (double theta : args.GetArgOrDefault("theta", "1.0")) { + for (double sigma : args.GetArgOrDefault("sigma", "0.5")) { + for (double mu : args.GetArgOrDefault("mu", "1.0")) { + for (ui64 n : args.GetArg("N")) { + std::stringstream commonParams; + commonParams << (aggregateIdx ++) + << "," << n + << "," << alpha << "," << theta + << "," << sigma << "," << mu; + + std::map aggregate; + for (ui64 i = 0; i < topologyGenerationRepeats; ++ i) { + Cout << "\n\n\n"; + + if (ctx.State) { + ctx.RNG.Forward(ctx.State->TopologyCounter); + } + + ui32 counterTopology = ctx.RNG.GetCounter(); + auto initialGraph = generateTopology(ctx.RNG, n, mu, sigma); + + for (ui64 j = 0; j < mcmcRepeats; ++ j) { + TRelationGraph graph = initialGraph; + + if (ctx.State) { + ctx.RNG.Forward(ctx.State->MCMCCounter); + } + + ui32 counterMCMC = ctx.RNG.GetCounter(); + if (topologyName == "mcmc") { + MCMCRandomize(ctx.RNG, graph); + } + + for (ui64 k = 0; k < equiJoinKeysGenerationRepeats; ++ k) { + if (ctx.State) { + ctx.RNG.Forward(ctx.State->KeyCounter); + } + + ui32 counterKeys = ctx.RNG.GetCounter(); + graph.SetupKeysPitmanYor(ctx.RNG, TPitmanYorConfig{.Alpha = alpha, .Theta = theta}); + + std::string state = TBenchState(ctx.RNG.GetSeed(), counterTopology, counterMCMC, counterKeys).toHex(); + + Cout << "\n\n"; + Cout << "Test #" << idx << "\n"; + Cout << "Reproduce: TOPOLOGY='"; + Cout << "type=" << topologyName << "; " + << "N=" << n << "; " + << "alpha=" << alpha << "; " + << "theta=" << theta << "; "; + + if (topologyName == "mcmc" || topologyName == "chung-lu") { + Cout << "sigma=" << sigma << "; " + << "mu=" << mu << "; "; + } + + if (args.HasArg("reorder")) { + Cout << "reorder=" << reorder << "; "; + } + + Cout << "state=" << state << "'\n"; + + try { + if (reorder) { + Cout << "================================= GRAPH BEFORE REODERING =================\n"; + graph.DumpGraph(Cout); + graph.ReorderDFS(); + } + + auto result = BenchmarkShuffleEliminationOnTopology(config, ctx.Session, resultType, graph); + if (!result) { + goto stop; + } + + AccumulateAllStats(aggregate, *result); + std::string graphName = WriteGraph(ctx, idx, graph); + + std::stringstream params; + params << idx ++ << "," << state << "," << graphName << "," << commonParams.str(); + WriteAllStats(ctx, "", header, params.str(), *result); + } catch (std::exception &exc) { + Cout << "Skipped run: " << exc.what() << "\n"; + continue; + } + + if (ctx.State) { + // We are running in reproducibility mode, stop immediately after case is reproduced + return; + } + } + } + } + + WriteAllStats(ctx, "aggregate-", headerAggregate, commonParams.str(), aggregate); + } + stop:; + } + } + } + } + } + + Y_UNIT_TEST(Benchmark) { + TArgs args{GetTestParam("TOPOLOGY")}; + if (!args.HasArg("N")) { + // prevent this test from launching non-interactively + return; + } + + auto config = GetBenchmarkConfig(args); + DumpBenchmarkConfig(Cout, config); + + TTestContext ctx = CreateTestContext(args, GetTestParam("SAVE_DIR")); + + RunBenches(ctx, config, args); + } + +} // Y_UNIT_TEST_SUITE(KqpJoinTopology) + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/join/ya.make b/ydb/core/kqp/ut/join/ya.make index 93e27c09c801..50e8c2ea9070 100644 --- a/ydb/core/kqp/ut/join/ya.make +++ b/ydb/core/kqp/ut/join/ya.make @@ -24,6 +24,9 @@ SRCS( kqp_index_lookup_join_ut.cpp kqp_join_ut.cpp kqp_join_order_ut.cpp + kqp_join_topology_generator.cpp + kqp_join_topology_ut.cpp + kqp_benches_ut.cpp ) PEERDIR( diff --git a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp index 8029d2455361..9d24f75ce534 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp +++ b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.cpp @@ -314,7 +314,7 @@ class TOptimizerNativeNew: public IOptimizerNew { public: TOptimizerNativeNew( IProviderContext& ctx, - const TCBOSettings &optimizerSettings, + const TCBOSettings& optimizerSettings, TExprContext& exprCtx, bool enableShuffleElimination, TSimpleSharedPtr orderingsFSM, @@ -388,7 +388,7 @@ class TOptimizerNativeNew: public IOptimizerNew { YqlIssue( {}, TIssuesIds::CBO_ENUM_LIMIT_REACHED, "Cost Based Optimizer could not be applied to this query: " - "Enumeration is too large, use PRAGMA MaxDPHypDPTableSize='4294967295' to disable the limitation" + "Enumeration is too large, use PRAGMA ydb.MaxDPHypDPTableSize='4294967295' to disable the limitation" ) ); ComputeStatistics(joinTree, this->Pctx); @@ -516,7 +516,7 @@ class TOptimizerNativeNew: public IOptimizerNew { IOptimizerNew* MakeNativeOptimizerNew( IProviderContext& pctx, - const TCBOSettings &settings, + const TCBOSettings& settings, TExprContext& ectx, bool enableShuffleElimination, TSimpleSharedPtr orderingsFSM, diff --git a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h index 9ba43bceee26..09b8cc919387 100644 --- a/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h +++ b/ydb/library/yql/dq/opt/dq_opt_join_cost_based.h @@ -50,7 +50,7 @@ void CollectInterestingOrderingsFromJoinTree( IOptimizerNew* MakeNativeOptimizerNew( IProviderContext& ctx, - const TCBOSettings &settings, + const TCBOSettings& settings, TExprContext& ectx, bool enableShuffleElimination, TSimpleSharedPtr orderingsFSM = nullptr,