diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index f85c4cc26b5e..ef8ebe08aae1 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -148,13 +148,17 @@ class JoinFuzzer { const std::vector& buildInput, const std::vector& outputColumns); + // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. + // If withFilter is true, uses the equiality filter between probeKeys and + // buildKeys as the join filter. Uses empty join filter otherwise. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( core::JoinType joinType, const std::vector& probeKeys, const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns); + const std::vector& outputColumns, + bool withFilter = true); // Makes the default query plan with table scan as inputs for both probe and // build sides. @@ -183,6 +187,9 @@ class JoinFuzzer { buildSplits, const std::vector& outputColumns); + // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan + // nodes. If withFilter is true, uses the equiality filter between probeKeys + // and buildKeys as the join filter. Uses empty join filter otherwise. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlanWithTableScan( core::JoinType joinType, const RowTypePtr& probeType, @@ -193,7 +200,8 @@ class JoinFuzzer { probeSplits, const std::vector>& buildSplits, - const std::vector& outputColumns); + const std::vector& outputColumns, + bool withFilter = true); void makeAlternativePlans( const core::PlanNodePtr& plan, @@ -230,6 +238,16 @@ class JoinFuzzer { const std::vector& probeKeys, const std::vector& buildKeys); + // Writes probeInput and buildInput to files under tableDir and returns a + // tuple of (probeScanSplits, buildScanSplits). + std::tuple< + std::vector>, + std::vector>> + generateInputSplits( + const std::string& tableDir, + const std::vector& probeInput, + const std::vector& buildInput); + void shuffleJoinKeys( std::vector& probeKeys, std::vector& buildKeys); @@ -261,11 +279,22 @@ class JoinFuzzer { RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); + template std::optional computeDuckDbResult( const std::vector& probeInput, const std::vector& buildInput, const core::PlanNodePtr& plan); + // Generates and executes plans using NestedLoopJoin without filters. The + // result is compared to DuckDB. + void testCrossProduct( + const std::string& tableDir, + core::JoinType joinType, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput); + int32_t randInt(int32_t min, int32_t max) { return boost::random::uniform_int_distribution(min, max)(rng_); } @@ -406,20 +435,23 @@ std::vector JoinFuzzer::generateBuildInput( // probeInput. // To ensure there are some matches, sample with replacement 10% of probe join - // keys and use these as build keys. - // TODO Add a few random rows as well. + // keys and use these as 80% of build keys. The rest build keys are randomly + // generated. std::vector input; for (const auto& probe : probeInput) { - auto numRows = 1 + probe->size() / 10; - auto build = BaseVector::create(rowType, numRows, probe->pool()); + auto numRows = 1 + probe->size() / 8; + auto build = vectorFuzzer_.fuzzRow(rowType, numRows, false); // Pick probe side rows to copy. std::vector rowNumbers(numRows); + SelectivityVector rows(numRows, false); for (auto i = 0; i < numRows; ++i) { - rowNumbers[i] = randInt(0, probe->size() - 1); + if (vectorFuzzer_.coinToss(0.8) && probe->size() > 0) { + rowNumbers[i] = randInt(0, probe->size() - 1); + rows.setValid(i, true); + } } - SelectivityVector rows(numRows); for (auto i = 0; i < probeKeys.size(); ++i) { build->childAt(i)->resize(numRows); build->childAt(i)->copy(probe->childAt(i).get(), rows, rowNumbers.data()); @@ -632,6 +664,7 @@ bool containsUnsupportedTypes(const TypePtr& type) { containsType(type, INTERVAL_DAY_TIME()); } +template std::optional JoinFuzzer::computeDuckDbResult( const std::vector& probeInput, const std::vector& buildInput, @@ -648,7 +681,7 @@ std::optional JoinFuzzer::computeDuckDbResult( queryRunner.createTable("t", probeInput); queryRunner.createTable("u", buildInput); - auto* joinNode = dynamic_cast(plan.get()); + auto* joinNode = dynamic_cast(plan.get()); VELOX_CHECK_NOT_NULL(joinNode); const auto joinKeysToSql = [](auto keys) { @@ -662,68 +695,90 @@ std::optional JoinFuzzer::computeDuckDbResult( return out.str(); }; - const auto equiClausesToSql = [](auto joinNode) { - std::stringstream out; - for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { - if (i > 0) { - out << " AND "; - } - out << joinNode->leftKeys()[i]->name() << " = " - << joinNode->rightKeys()[i]->name(); - } - return out.str(); - }; - const auto& outputNames = plan->outputType()->names(); - std::stringstream sql; - if (joinNode->isLeftSemiProjectJoin()) { - sql << "SELECT " - << folly::join(", ", outputNames.begin(), --outputNames.end()); - } else { - sql << "SELECT " << folly::join(", ", outputNames); - } - switch (joinNode->joinType()) { - case core::JoinType::kInner: - sql << " FROM t INNER JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kLeft: - sql << " FROM t LEFT JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kFull: - sql << " FROM t FULL OUTER JOIN u ON " << equiClausesToSql(joinNode); - break; - case core::JoinType::kLeftSemiFilter: - if (joinNode->leftKeys().size() > 1) { - return std::nullopt; - } - sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) - << " FROM u)"; - break; - case core::JoinType::kLeftSemiProject: - if (joinNode->isNullAware()) { - sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " - << joinKeysToSql(joinNode->rightKeys()) << " FROM u) FROM t"; - } else { - sql << ", EXISTS (SELECT * FROM u WHERE " << equiClausesToSql(joinNode) - << ") FROM t"; + if constexpr (std::is_same_v) { + const auto equiClausesToSql = [](auto joinNode) { + std::stringstream out; + for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { + if (i > 0) { + out << " AND "; + } + out << joinNode->leftKeys()[i]->name() << " = " + << joinNode->rightKeys()[i]->name(); } - break; - case core::JoinType::kAnti: - if (joinNode->isNullAware()) { + return out.str(); + }; + + if (joinNode->isLeftSemiProjectJoin()) { + sql << "SELECT " + << folly::join(", ", outputNames.begin(), --outputNames.end()); + } else { + sql << "SELECT " << folly::join(", ", outputNames); + } + + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM t INNER JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kLeft: + sql << " FROM t LEFT JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kFull: + sql << " FROM t FULL OUTER JOIN u ON " << equiClausesToSql(joinNode); + break; + case core::JoinType::kLeftSemiFilter: + if (joinNode->leftKeys().size() > 1) { + return std::nullopt; + } sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) - << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) + << " IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) << " FROM u)"; - } else { - sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " - << equiClausesToSql(joinNode) << ")"; - } - break; - default: - VELOX_UNREACHABLE( - "Unknown join type: {}", static_cast(joinNode->joinType())); + break; + case core::JoinType::kLeftSemiProject: + if (joinNode->isNullAware()) { + sql << ", " << joinKeysToSql(joinNode->leftKeys()) << " IN (SELECT " + << joinKeysToSql(joinNode->rightKeys()) << " FROM u) FROM t"; + } else { + sql << ", EXISTS (SELECT * FROM u WHERE " + << equiClausesToSql(joinNode) << ") FROM t"; + } + break; + case core::JoinType::kAnti: + if (joinNode->isNullAware()) { + sql << " FROM t WHERE " << joinKeysToSql(joinNode->leftKeys()) + << " NOT IN (SELECT " << joinKeysToSql(joinNode->rightKeys()) + << " FROM u)"; + } else { + sql << " FROM t WHERE NOT EXISTS (SELECT * FROM u WHERE " + << equiClausesToSql(joinNode) << ")"; + } + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } + } else { + // Nested loop join without filter. + VELOX_CHECK( + joinNode->joinCondition() == nullptr, + "This code path should be called only for nested loop join without filter"); + const std::string joinCondition{"(1 = 1)"}; + switch (joinNode->joinType()) { + case core::JoinType::kInner: + sql << " FROM t INNER JOIN u ON " << joinCondition; + break; + case core::JoinType::kLeft: + sql << " FROM t LEFT JOIN u ON " << joinCondition; + break; + case core::JoinType::kFull: + sql << " FROM t FULL OUTER JOIN u ON " << joinCondition; + break; + default: + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); + } } return queryRunner.execute(sql.str(), plan->outputType()); @@ -913,9 +968,11 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns) { + const std::vector& outputColumns, + bool withFilter) { auto planNodeIdGenerator = std::make_shared(); - const auto filter = makeJoinFilter(probeKeys, buildKeys); + const std::string filter = + withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .values(probeInput) @@ -1063,6 +1120,69 @@ bool isTableScanSupported(const TypePtr& type) { return true; } +void JoinFuzzer::testCrossProduct( + const std::string& tableDir, + core::JoinType joinType, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput) { + VELOX_CHECK(probeInput.size() > 0 && buildInput.size() > 0); + const auto probeType = asRowType(probeInput[0]->type()); + const auto buildType = asRowType(buildInput[0]->type()); + auto outputColumns = + concat(asRowType(probeInput[0]->type()), asRowType(buildInput[0]->type())) + ->names(); + + auto plan = makeNestedLoopJoinPlan( + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput, + outputColumns, + /*withFilter*/ false); + const auto expected = execute(plan, /*injectSpill=*/false); + + // If OOM injection is not enabled verify the results against DuckDB. + if (!FLAGS_enable_oom_injection) { + if (auto duckDbResult = computeDuckDbResult( + probeInput, buildInput, plan.plan)) { + VELOX_CHECK( + assertEqualResults( + duckDbResult.value(), plan.plan->outputType(), {expected}), + "Velox and DuckDB results don't match"); + } + } + + std::vector altPlans; + if (isTableScanSupported(probeInput[0]->type()) && + isTableScanSupported(buildInput[0]->type())) { + const auto [probeScanSplits, buildScanSplits] = + generateInputSplits(tableDir, probeInput, buildInput); + altPlans.push_back(makeNestedLoopJoinPlanWithTableScan( + joinType, + probeType, + buildType, + probeKeys, + buildKeys, + probeScanSplits, + buildScanSplits, + outputColumns, + /*withFilter*/ false)); + } + addFlippedJoinPlan(plan.plan, altPlans); + + for (const auto& altPlan : altPlans) { + auto actual = execute(altPlan, /*injectSpill=*/false); + if (actual != nullptr && expected != nullptr) { + VELOX_CHECK( + assertEqualResults({expected}, {actual}), + "Logically equivalent plans produced different results"); + } + } +} + void JoinFuzzer::verify(core::JoinType joinType) { const bool nullAware = isNullAwareSupported(joinType) && vectorFuzzer_.coinToss(0.5); @@ -1093,6 +1213,32 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } + const auto tableScanDir = exec::test::TempDirectoryPath::create(); + + // Test cross product without filter with 10% chance. Avoid testing cross + // product if input size is too large. + if ((core::isInnerJoin(joinType) || core::isLeftJoin(joinType) || + core::isFullJoin(joinType)) && + FLAGS_batch_size * FLAGS_num_batches <= 500) { + if (vectorFuzzer_.coinToss(0.1)) { + testCrossProduct( + tableScanDir->getPath(), + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput); + testCrossProduct( + tableScanDir->getPath(), + joinType, + probeKeys, + buildKeys, + flatProbeInput, + flatBuildInput); + return; + } + } + auto outputColumns = (core::isLeftSemiProjectJoin(joinType) || core::isLeftSemiFilterJoin(joinType) || core::isAntiJoin(joinType)) @@ -1128,8 +1274,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { // If OOM injection is not enabled verify the results against DuckDB. if (!FLAGS_enable_oom_injection) { - if (auto duckDbResult = - computeDuckDbResult(probeInput, buildInput, defaultPlan.plan)) { + if (auto duckDbResult = computeDuckDbResult( + probeInput, buildInput, defaultPlan.plan)) { VELOX_CHECK( assertEqualResults( duckDbResult.value(), defaultPlan.plan->outputType(), {expected}), @@ -1151,7 +1297,6 @@ void JoinFuzzer::verify(core::JoinType joinType) { makeAlternativePlans( defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans); - const auto tableScanDir = exec::test::TempDirectoryPath::create(); addPlansWithTableScan( tableScanDir->getPath(), joinType, @@ -1249,12 +1394,14 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( const std::vector& buildKeys, const std::vector>& probeSplits, const std::vector>& buildSplits, - const std::vector& outputColumns) { + const std::vector& outputColumns, + bool withFilter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - const auto filter = makeJoinFilter(probeKeys, buildKeys); + const std::string filter = + withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .tableScan(probeType) @@ -1274,22 +1421,18 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( {buildScanId, fromConnectorSplits(buildSplits)}}}; } -void JoinFuzzer::addPlansWithTableScan( +std::tuple< + std::vector>, + std::vector>> +JoinFuzzer::generateInputSplits( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans) { + const std::vector& buildInput) { VELOX_CHECK(!tableDir.empty()); - - if (!isTableScanSupported(probeInput[0]->type()) || - !isTableScanSupported(buildInput[0]->type())) { - return; - } + VELOX_CHECK(probeInput.size() > 0 && buildInput.size() > 0); + VELOX_CHECK( + isTableScanSupported(probeInput[0]->type()) && + isTableScanSupported(buildInput[0]->type())); std::vector> probeScanSplits; for (auto i = 0; i < probeInput.size(); ++i) { @@ -1305,6 +1448,29 @@ void JoinFuzzer::addPlansWithTableScan( buildScanSplits.push_back(makeSplit(filePath)); } + return std::make_tuple(probeScanSplits, buildScanSplits); +} + +void JoinFuzzer::addPlansWithTableScan( + const std::string& tableDir, + core::JoinType joinType, + bool nullAware, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput, + const std::vector& outputColumns, + std::vector& altPlans) { + VELOX_CHECK(!tableDir.empty()); + + if (!isTableScanSupported(probeInput[0]->type()) || + !isTableScanSupported(buildInput[0]->type())) { + return; + } + + const auto [probeScanSplits, buildScanSplits] = + generateInputSplits(tableDir, probeInput, buildInput); + auto probeType = asRowType(probeInput[0]->type()); auto buildType = asRowType(buildInput[0]->type()); @@ -1336,10 +1502,18 @@ void JoinFuzzer::addPlansWithTableScan( const int32_t numGroups = randInt(1, probeScanSplits.size()); const std::vector groupedProbeScanSplits = generateSplitsWithGroup( - tableDir, numGroups, /*isProbe=*/true, probeKeys.size(), probeInput); + tableDir, + numGroups, + /*isProbe=*/true, + probeKeys.size(), + probeInput); const std::vector groupedBuildScanSplits = generateSplitsWithGroup( - tableDir, numGroups, /*isProbe=*/false, buildKeys.size(), buildInput); + tableDir, + numGroups, + /*isProbe=*/false, + buildKeys.size(), + buildInput); for (const auto& planWithTableScan : plansWithTableScan) { altPlans.push_back(planWithTableScan);