From 3f5f1a215f763af649813b19fd0e9c0fad649fca Mon Sep 17 00:00:00 2001 From: Wei He Date: Tue, 4 Jun 2024 10:04:54 -0700 Subject: [PATCH] Test NestedLoopJoin without filter in join fuzzer (#9923) Summary: A correctness bug was found recently in NestedLoopJoin without filter (https://github.com/facebookincubator/velox/pull/9892). So this diff extends join fuzzer to cover this case with 10% chance. This diff also makes the build side input to contain rows that do not match the probe side. Reviewed By: xiaoxmeng Differential Revision: D57761514 --- velox/exec/fuzzer/JoinFuzzer.cpp | 301 +++++++++++++++++++++++-------- 1 file changed, 227 insertions(+), 74 deletions(-) diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 79e88a34b06e..34c8bf0bd118 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -143,13 +143,17 @@ class JoinFuzzer { const std::vector& buildInput, const std::vector& outputColumns); + // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. + // If withFilter is true, uses the equiality filter between probeKeys and + // buildKeys as the join filter. Uses empty join filter otherwise. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( core::JoinType joinType, const std::vector& probeKeys, const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns); + const std::vector& outputColumns, + bool withFilter = true); // Makes the default query plan with table scan as inputs for both probe and // build sides. @@ -174,6 +178,9 @@ class JoinFuzzer { const std::vector& buildSplits, const std::vector& outputColumns); + // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan + // nodes. If withFilter is true, uses the equiality filter between probeKeys + // and buildKeys as the join filter. Uses empty join filter otherwise. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlanWithTableScan( core::JoinType joinType, const RowTypePtr& probeType, @@ -182,7 +189,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns); + const std::vector& outputColumns, + bool withFilter = true); void makeAlternativePlans( const core::PlanNodePtr& plan, @@ -250,11 +258,23 @@ class JoinFuzzer { RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); + template std::optional computeDuckDbResult( const std::vector& probeInput, const std::vector& buildInput, const core::PlanNodePtr& plan); + // Generates and executes plans using NestedLoopJoin without filters. The + // result is compared to DuckDB. Returns the result vector of the cross + // product. + RowVectorPtr testCrossProduct( + const std::string& tableDir, + core::JoinType joinType, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput); + int32_t randInt(int32_t min, int32_t max) { return boost::random::uniform_int_distribution(min, max)(rng_); } @@ -395,20 +415,24 @@ std::vector JoinFuzzer::generateBuildInput( // probeInput. // To ensure there are some matches, sample with replacement 10% of probe join - // keys and use these as build keys. - // TODO Add a few random rows as well. + // keys and use these as 80% of build keys. The rest build keys are randomly + // generated. This allows the build side to have unmatched rows that should + // appear in right join and full join. std::vector input; for (const auto& probe : probeInput) { - auto numRows = 1 + probe->size() / 10; - auto build = BaseVector::create(rowType, numRows, probe->pool()); + auto numRows = 1 + probe->size() / 8; + auto build = vectorFuzzer_.fuzzRow(rowType, numRows, false); // Pick probe side rows to copy. std::vector rowNumbers(numRows); + SelectivityVector rows(numRows, false); for (auto i = 0; i < numRows; ++i) { - rowNumbers[i] = randInt(0, probe->size() - 1); + if (vectorFuzzer_.coinToss(0.8) && probe->size() > 0) { + rowNumbers[i] = randInt(0, probe->size() - 1); + rows.setValid(i, true); + } } - SelectivityVector rows(numRows); for (auto i = 0; i < probeKeys.size(); ++i) { build->childAt(i)->resize(numRows); build->childAt(i)->copy(probe->childAt(i).get(), rows, rowNumbers.data()); @@ -584,6 +608,7 @@ core::PlanNodePtr tryFlipJoinSides(const core::NestedLoopJoinNode& joinNode) { joinNode.outputType()); } +template std::optional JoinFuzzer::computeDuckDbResult( const std::vector& probeInput, const std::vector& buildInput, @@ -600,7 +625,7 @@ std::optional JoinFuzzer::computeDuckDbResult( queryRunner.createTable("t", probeInput); queryRunner.createTable("u", buildInput); - auto* joinNode = dynamic_cast(plan.get()); + auto* joinNode = dynamic_cast(plan.get()); VELOX_CHECK_NOT_NULL(joinNode); const auto joinKeysToSql = [](auto keys) { @@ -614,68 +639,90 @@ std::optional JoinFuzzer::computeDuckDbResult( return out.str(); }; - const auto equiClausesToSql = [](auto joinNode) { - std::stringstream out; - for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { - if (i > 0) { - out << " AND "; - } - out << joinNode->leftKeys()[i]->name() << " = " - << joinNode->rightKeys()[i]->name(); - } - return out.str(); - }; - const auto& outputNames = plan->outputType()->names(); - std::stringstream sql; - if (joinNode->isLeftSemiProjectJoin()) { - sql << "SELECT " - << folly::join(", ", outputNames.begin(), --outputNames.end()); - } else { - sql << "SELECT " << folly::join(", ", outputNames); - } - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kLeftSemiFilter: - if (joinNode->leftKeys().size() > 1) { - return std::nullopt; - } - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u)"; - break; - case core::JoinType::kLeftSemiProject: - if (joinNode->isNullAware()) { - sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " - << joinKeysToSql(joinNode->rightKeys()) << " FROM u) FROM t"; - } else { - sql << ", EXISTS (SELECT * FROM u WHERE " << equiClausesToSql(joinNode) - << ") FROM t"; + if constexpr (std::is_same_v) { + const auto equiClausesToSql = [](auto joinNode) { + std::stringstream out; + for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { + if (i > 0) { + out << " AND "; + } + out << joinNode->leftKeys()[i]->name() << " = " + << joinNode->rightKeys()[i]->name(); } - break; - case core::JoinType::kAnti: - if (joinNode->isNullAware()) { + return out.str(); + }; + + if (joinNode->isLeftSemiProjectJoin()) { + sql << "SELECT " + << folly::join(", ", outputNames.begin(), --outputNames.end()); + } else { + sql << "SELECT " << folly::join(", ", outputNames); + } + + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM t INNER JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kLeft: + sql << " FROM t LEFT JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kFull: + sql << " FROM t FULL OUTER JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kLeftSemiFilter: + if (joinNode->leftKeys().size() > 1) { + return std::nullopt; + } sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) + << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) << " FROM u)"; - } else { - sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " - << equiClausesToSql(joinNode) << ")"; - } - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); + break; + case core::JoinType::kLeftSemiProject: + if (joinNode->isNullAware()) { + sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM u) FROM t"; + } else { + sql << ", EXISTS (SELECT * FROM u WHERE " + << equiClausesToSql(joinNode) << ") FROM t"; + } + break; + case core::JoinType::kAnti: + if (joinNode->isNullAware()) { + sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) + << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) + << " FROM u)"; + } else { + sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " + << equiClausesToSql(joinNode) << ")"; + } + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + } else { + // Nested loop join without filter. + VELOX_CHECK( + joinNode->joinCondition() == nullptr, + "This code path should be called only for nested loop join without filter"); + const std::string joinCondition{"(1 = 1)"}; + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM t INNER JOIN u ON " << joinCondition; + break; + case core::JoinType::kLeft: + sql << " FROM t LEFT JOIN u ON " << joinCondition; + break; + case core::JoinType::kFull: + sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } } return queryRunner.execute(sql.str(), plan->outputType()); @@ -853,9 +900,11 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns) { + const std::vector& outputColumns, + bool withFilter) { auto planNodeIdGenerator = std::make_shared(); - const auto filter = makeJoinFilter(probeKeys, buildKeys); + const std::string filter = + withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .values(probeInput) @@ -944,6 +993,75 @@ void JoinFuzzer::shuffleJoinKeys( } } +RowVectorPtr JoinFuzzer::testCrossProduct( + const std::string& tableDir, + core::JoinType joinType, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput) { + VELOX_CHECK_GT(probeInput.size(), 0); + VELOX_CHECK_GT(buildInput.size(), 0); + + const auto probeType = asRowType(probeInput[0]->type()); + const auto buildType = asRowType(buildInput[0]->type()); + auto outputColumns = + concat(asRowType(probeInput[0]->type()), asRowType(buildInput[0]->type())) + ->names(); + + auto plan = makeNestedLoopJoinPlan( + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput, + outputColumns, + /*withFilter*/ false); + const auto expected = execute(plan, /*injectSpill=*/false); + + // If OOM injection is not enabled verify the results against DuckDB. + if (!FLAGS_enable_oom_injection) { + if (auto duckDbResult = computeDuckDbResult( + probeInput, buildInput, plan.plan)) { + VELOX_CHECK( + assertEqualResults( + duckDbResult.value(), plan.plan->outputType(), {expected}), + "Velox and DuckDB results don't match"); + } + } + + std::vector altPlans; + if (isTableScanSupported(probeInput[0]->type()) && + isTableScanSupported(buildInput[0]->type())) { + std::vector probeScanSplits = + makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); + std::vector buildScanSplits = + makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); + + altPlans.push_back(makeNestedLoopJoinPlanWithTableScan( + joinType, + probeType, + buildType, + probeKeys, + buildKeys, + probeScanSplits, + buildScanSplits, + outputColumns, + /*withFilter*/ false)); + } + addFlippedJoinPlan(plan.plan, altPlans); + + for (const auto& altPlan : altPlans) { + auto actual = execute(altPlan, /*injectSpill=*/false); + if (actual != nullptr && expected != nullptr) { + VELOX_CHECK( + assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } + } + return expected; +} + void JoinFuzzer::verify(core::JoinType joinType) { const bool nullAware = isNullAwareSupported(joinType) && vectorFuzzer_.coinToss(0.5); @@ -974,6 +1092,32 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } + const auto tableScanDir = exec::test::TempDirectoryPath::create(); + + // Test cross product without filter with 10% chance. Avoid testing cross + // product if input size is too large. + if ((core::isInnerJoin(joinType) || core::isLeftJoin(joinType) || + core::isFullJoin(joinType)) && + FLAGS_batch_size * FLAGS_num_batches <= 500) { + if (vectorFuzzer_.coinToss(0.1)) { + auto result = testCrossProduct( + tableScanDir->getPath(), + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput); + auto flatResult = testCrossProduct( + tableScanDir->getPath(), + joinType, + probeKeys, + buildKeys, + flatProbeInput, + flatBuildInput); + assertEqualResults({result}, {flatResult}); + } + } + auto outputColumns = (core::isLeftSemiProjectJoin(joinType) || core::isLeftSemiFilterJoin(joinType) || core::isAntiJoin(joinType)) @@ -1009,8 +1153,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { // If OOM injection is not enabled verify the results against DuckDB. if (!FLAGS_enable_oom_injection) { - if (auto duckDbResult = - computeDuckDbResult(probeInput, buildInput, defaultPlan.plan)) { + if (auto duckDbResult = computeDuckDbResult( + probeInput, buildInput, defaultPlan.plan)) { VELOX_CHECK( assertEqualResults( duckDbResult.value(), defaultPlan.plan->outputType(), {expected}), @@ -1032,7 +1176,6 @@ void JoinFuzzer::verify(core::JoinType joinType) { makeAlternativePlans( defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans); - const auto tableScanDir = exec::test::TempDirectoryPath::create(); addPlansWithTableScan( tableScanDir->getPath(), joinType, @@ -1129,12 +1272,14 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns) { + const std::vector& outputColumns, + bool withFilter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - const auto filter = makeJoinFilter(probeKeys, buildKeys); + const std::string filter = + withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .tableScan(probeType) @@ -1206,10 +1351,18 @@ void JoinFuzzer::addPlansWithTableScan( const int32_t numGroups = randInt(1, probeScanSplits.size()); const std::vector groupedProbeScanSplits = generateSplitsWithGroup( - tableDir, numGroups, /*isProbe=*/true, probeKeys.size(), probeInput); + tableDir, + numGroups, + /*isProbe=*/true, + probeKeys.size(), + probeInput); const std::vector groupedBuildScanSplits = generateSplitsWithGroup( - tableDir, numGroups, /*isProbe=*/false, buildKeys.size(), buildInput); + tableDir, + numGroups, + /*isProbe=*/false, + buildKeys.size(), + buildInput); for (const auto& planWithTableScan : plansWithTableScan) { altPlans.push_back(planWithTableScan);