Skip to content

Commit

Permalink
repair word error
Browse files Browse the repository at this point in the history
Signed-off-by: qidi1 <1083369179@qq.com>
  • Loading branch information
qidi1 committed Jul 1, 2022
1 parent dcdae3c commit 46cb114
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 737 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright 2019 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may not use this file expect in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
Expand Down Expand Up @@ -130,35 +130,35 @@ class LogicalPlanTestSuite extends BasePlanTest {
// TableScan with Selection and without RangeFilter.
val df1 = spark.sql("select * from t1 where a>0 and b>'aa'")
val dag1 = extractDAGRequests(df1).head
val expection1 =
val expectation1 =
"== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiKV CoprocessorRDD{[table: t1] TableReader, Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255): " +
"{ TableRangeScan: { RangeFilter: [], Range: [%s] }, Selection: [%s] }, startTs: %d}".trim
val selection1 = dag1.getFilters.toArray().mkString(", ")
val myException1 =
expection1.format(extractRangeFromDAG(dag1), selection1, dag1.getStartTs.getVersion)
val myExpectation1 =
expectation1.format(extractRangeFromDAG(dag1), selection1, dag1.getStartTs.getVersion)
val sparkPhysicalPlan1 =
df1.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
assert(myException1.equals(sparkPhysicalPlan1))
assert(myExpectation1.equals(sparkPhysicalPlan1))

// TableScan with complex sql statements
val df2 = spark.sql(
"select * from t1 where a>0 or b > 'aa' or c<'cc' and c>'aa' order by(c) limit(10)")
val dag2 = extractDAGRequests(df2).head
val expection2 =
val expectation2 =
"== Physical Plan ==\n" +
"TakeOrderedAndProject(limit=10, orderBy=[c#171 ASC NULLS FIRST], output=[a#169L,b#170,c#171])\n" +
"+- *(1) ColumnarToRow\n" +
" +- TiKV CoprocessorRDD{[table: t1] TableReader, Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255): " +
"{ TableRangeScan: { RangeFilter: [], Range: [%s] }, Selection: [%s], " +
"Order By: [c@VARCHAR(255) ASC], Limit: [10] }, startTs: %d}".trim
val selection2 = dag2.getFilters.toArray().mkString(", ")
val myException2 =
expection2.format(extractRangeFromDAG(dag2), selection2, dag2.getStartTs.getVersion)
val myExpectation2 =
expectation2.format(extractRangeFromDAG(dag2), selection2, dag2.getStartTs.getVersion)
val sparkPhysicalPlan2 =
df2.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
assert(myException2.equals(sparkPhysicalPlan2))
assert(myExpectation2.equals(sparkPhysicalPlan2))
}

test("test physical plan explain which table with cluster index") {
Expand Down Expand Up @@ -195,51 +195,51 @@ class LogicalPlanTestSuite extends BasePlanTest {
// TableScan with Selection and with RangeFilter.
val df1 = spark.sql("select * from t1 where a>0 and b>'aa'")
val dag1 = extractDAGRequests(df1).head
val exception1 =
val expectation1 =
"== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiKV CoprocessorRDD{[table: t1] TableReader, Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255): " +
"{ TableRangeScan: { RangeFilter: [%s], Range: [%s] }, Selection: [%s] }, startTs: %d}".trim
val rangeFilter1 = dag1.getRangeFilter.toArray().mkString(", ")
val selection1 = dag1.getFilters.toArray.mkString(", ")
val myException1 = exception1.format(
val myExpectation1 = expectation1.format(
rangeFilter1,
extractRangeFromDAG(dag1),
selection1,
dag1.getStartTs.getVersion)
val sparkPhysicalPlan1 =
df1.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
assert(myException1.equals(sparkPhysicalPlan1))
assert(myExpectation1.equals(sparkPhysicalPlan1))

// TableScan without Selection and with RangeFilter.
val df2 = spark.sql("select * from t1 where a>0")
val dag2 = extractDAGRequests(df2).head
val exception2 =
val expectation2 =
"== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiKV CoprocessorRDD{[table: t1] TableReader, Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255): " +
"{ TableRangeScan: { RangeFilter: [%s], Range: [%s] } }, startTs: %d}".trim
val rangeFilter2 = dag2.getRangeFilter.toArray().mkString(", ")
val myException2 =
exception2.format(rangeFilter2, extractRangeFromDAG(dag2), dag2.getStartTs.getVersion)
val myExpectation2 =
expectation2.format(rangeFilter2, extractRangeFromDAG(dag2), dag2.getStartTs.getVersion)
val sparkPhysicalPlan2 =
df2.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
assert(myException2.equals(sparkPhysicalPlan2))
assert(myExpectation2.equals(sparkPhysicalPlan2))

// TableScan with Selection and without RangeFilter.
val df3 = spark.sql("select * from t1 where b>'aa'")
val dag3 = extractDAGRequests(df3).head
val exception3 =
val expectation3 =
("== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiKV CoprocessorRDD{[table: t1] TableReader, Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255): { " +
"TableRangeScan: { RangeFilter: [], Range: [%s] }, Selection: [%s] }, startTs: %d}").trim
val selection3 = dag3.getFilters.toArray().mkString(", ")
val myException3 =
exception3.format(extractRangeFromDAG(dag3), selection3, dag3.getStartTs.getVersion)
val myExpectation3 =
expectation3.format(extractRangeFromDAG(dag3), selection3, dag3.getStartTs.getVersion)
val sparkPhysicalPlan3 =
df3.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
assert(myException3.equals(sparkPhysicalPlan3))
assert(myExpectation3.equals(sparkPhysicalPlan3))
}

test("test physical plan explain which table with cluster index and partition") {
Expand All @@ -258,7 +258,7 @@ class LogicalPlanTestSuite extends BasePlanTest {
// TableScan with Selection and with RangeFilter.
val df = spark.sql("select b from t1 where t1.b>'aa'")
val dags = extractDAGRequests(df)
val exception =
val expectation =
"== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" + "" +
"+- partition table[\n" +
Expand All @@ -271,7 +271,7 @@ class LogicalPlanTestSuite extends BasePlanTest {
"]".trim
val selection0 = dags.head.getFilters.toArray.mkString(", ")
val selection1 = dags(1).getFilters.toArray.mkString(", ")
val myExceptionPlan = exception.format(
val myExpectationPlan = expectation.format(
extractRangeFromDAG(dags.head),
selection0,
dags.head.getStartTs.getVersion,
Expand All @@ -280,7 +280,7 @@ class LogicalPlanTestSuite extends BasePlanTest {
dags(1).getStartTs.getVersion)
val sparkPhysicalPlan =
df.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
assert(myExceptionPlan.equals(sparkPhysicalPlan))
assert(myExpectationPlan.equals(sparkPhysicalPlan))
}

test("test physical plan explain which table with secondary index") {
Expand All @@ -296,7 +296,7 @@ class LogicalPlanTestSuite extends BasePlanTest {
// IndexScan with Selection and with RangeFilter.
val df1 = spark.sql("SELECT * FROM t1 where a>0 and b > 'aa'")
val dag1 = extractDAGRequests(df1).head
val exception1 =
val expectation1 =
"== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[%s]\n" +
Expand All @@ -308,7 +308,7 @@ class LogicalPlanTestSuite extends BasePlanTest {
val downgradeFilter1 = dag1.getDowngradeFilters.toArray.mkString(", ")
val rangeFilter1 = dag1.getRangeFilter.toArray.mkString(", ")
val selection1 = dag1.getFilters.toArray.mkString(", ")
val myExceptionPlan1 = exception1.format(
val myExpectationPlan1 = expectation1.format(
downgradeFilter1,
rangeFilter1,
extractRangeFromDAG(dag1),
Expand All @@ -317,12 +317,12 @@ class LogicalPlanTestSuite extends BasePlanTest {
dag1.getStartTs.getVersion)
val sparkPhysicalPlan1 =
df1.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
assert(myExceptionPlan1.equals(sparkPhysicalPlan1))
assert(myExpectationPlan1.equals(sparkPhysicalPlan1))

// IndexScan without Selection and with RangeFilter.
val df2 = spark.sql("SELECT * FROM t1 where a=0 and b > 'aa'")
val dag2 = extractDAGRequests(df2).head
val exception2 =
val expection2 =
("== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[%s]\n" +
Expand All @@ -334,17 +334,17 @@ class LogicalPlanTestSuite extends BasePlanTest {
df2.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
val downgradeFilter2 = dag2.getDowngradeFilters.toArray.mkString(", ")
val rangeFilter2 = dag2.getRangeFilter.toArray.mkString(", ")
val myExceptionPlan2 = exception2.format(
val myExpectationPlan2 = expection2.format(
downgradeFilter2,
rangeFilter2,
extractRangeFromDAG(dag2),
dag2.getStartTs.getVersion)
assert(myExceptionPlan2.equals(sparkPhysicalPlan2))
assert(myExpectationPlan2.equals(sparkPhysicalPlan2))

// CoveringIndex with Selection and with RangeFilter.
val df3 = spark.sql("SELECT a,b FROM t1 where a>0 and b > 'aa'")
val dag3 = extractDAGRequests(df3).head
val exception3 =
val expectation3 =
("== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiKV CoprocessorRDD{[table: t1] IndexReader, Columns: a@LONG, b@VARCHAR(255): { " +
Expand All @@ -354,13 +354,13 @@ class LogicalPlanTestSuite extends BasePlanTest {
df3.queryExecution.explainString(ExplainMode.fromString(SimpleMode.name)).trim
val rangeFilter3 = dag3.getRangeFilter.toArray.mkString(", ")
val selection3 = dag3.getFilters.toArray.mkString(", ")
val myExceptionPlan3 = exception3.format(
val myExpectationPlan3 = expectation3.format(
rangeFilter3,
extractRangeFromDAG(dag3),
selection3,
dag3.getStartTs.getVersion)
assert(
myExceptionPlan3
myExpectationPlan3
.equals(sparkPhysicalPlan3))

// IndexScan with complex sql statements
Expand All @@ -372,38 +372,38 @@ class LogicalPlanTestSuite extends BasePlanTest {
val rangeFilter4 = dag4.getRangeFilter.toArray.mkString(", ")
val downgradeFilter4 = dag4.getDowngradeFilters.toArray.mkString(", ")
val selection4 = dag4.getFilters.toArray.mkString(", ")
var exceptRegionTaskExec4 =
var expectRegionTaskExec4 =
("TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[%s]")
exceptRegionTaskExec4 = exceptRegionTaskExec4.format(downgradeFilter4)
var exceptDAG4 = "[table: t1] IndexLookUp, Columns: c@VARCHAR(255), a@LONG: " +
expectRegionTaskExec4 = expectRegionTaskExec4.format(downgradeFilter4)
var expectDAG4 = "[table: t1] IndexLookUp, Columns: c@VARCHAR(255), a@LONG: " +
"{ {IndexRangeScan(Index:testindex(a,b)): " +
"{ RangeFilter: [%s], " +
"Range: [%s] }}; " +
"{TableRowIDScan, Selection: [%s], Aggregates: Max(c@VARCHAR(255)), First(c@VARCHAR(255)), " +
"Group By: [c@VARCHAR(255) ASC]} }, startTs: %d"
exceptDAG4 = exceptDAG4.format(
expectDAG4 = expectDAG4.format(
rangeFilter4,
extractRangeFromDAG(dag4),
selection4,
dag4.getStartTs.getVersion)
assert(exceptRegionTaskExec4.equals(regionTaskExec4))
assert(exceptDAG4.equals(dag4.toString))
assert(expectRegionTaskExec4.equals(regionTaskExec4))
assert(expectDAG4.equals(dag4.toString))

// IndexScan with complex sql statements
val df5 = spark.sql("select sum(a) from t1 where a>0 and b > 'aa' or b<'cc' and a>0")
val dag5 = extractDAGRequests(df5).head
val exception5 =
val expectation5 =
("[table: t1] IndexReader, Columns: a@LONG, b@VARCHAR(255): " +
"{ IndexRangeScan(Index:testindex(a,b)): " +
"{ RangeFilter: [%s], Range: [%s] }, Selection: [%s], Aggregates: Sum(a@LONG) }, startTs: %d").trim
val rangeFilter5 = dag5.getRangeFilter.toArray.mkString(", ")
val selection5 = dag5.getFilters.toArray.mkString(", ")
val myException5 = exception5.format(
val myExpectation5 = expectation5.format(
rangeFilter5,
extractRangeFromDAG(dag5),
selection5,
dag5.getStartTs.getVersion)
assert(myException5.equals(dag5.toString.trim))
assert(myExpectation5.equals(dag5.toString.trim))
}

test("test physical plan explain which table with secondary prefix index") {
Expand All @@ -419,7 +419,7 @@ class LogicalPlanTestSuite extends BasePlanTest {
// IndexScan with RangeFilter and with Selection.
val df1 = spark.sql("SELECT * FROM t1 where a>0 and b > 'aa'")
val dag1 = extractDAGRequests(df1).head
val exception1 =
val expectation1 =
("== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[%s]\n" +
Expand All @@ -432,20 +432,20 @@ class LogicalPlanTestSuite extends BasePlanTest {
val downgradeFilter1 = dag1.getDowngradeFilters.toArray.mkString(", ")
val rangeFilter1 = dag1.getRangeFilter.toArray.mkString(", ")
val selection1 = dag1.getFilters.toArray.mkString(", ")
val myExceptionPlan1 = exception1.format(
val myExpectationPlan1 = expectation1.format(
downgradeFilter1,
rangeFilter1,
extractRangeFromDAG(dag1),
selection1,
dag1.getStartTs.getVersion)
assert(
myExceptionPlan1
myExpectationPlan1
.equals(sparkPhysicalPlan1))

// IndexScan with RangeFilter and without Selection.
val df2 = spark.sql("SELECT * FROM t1 where b > 'aa'")
val dag2 = extractDAGRequests(df2).head
val exception2 =
val expectation2 =
("== Physical Plan ==\n" +
"*(1) ColumnarToRow\n" +
"+- TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[%s]\n" +
Expand All @@ -458,14 +458,14 @@ class LogicalPlanTestSuite extends BasePlanTest {
val downgradeFilter2 = dag2.getDowngradeFilters.toArray.mkString(", ")
val rangeFilter2 = dag2.getRangeFilter.toArray.mkString(", ")
val selection2 = dag2.getFilters.toArray.mkString(", ")
val myExceptionPlan2 = exception2.format(
val myExpectationPlan2= expectation2.format(
downgradeFilter2,
rangeFilter2,
extractRangeFromDAG(dag2),
selection2,
dag2.getStartTs.getVersion)
assert(
myExceptionPlan2
myExpectationPlan2
.equals(sparkPhysicalPlan2))

// IndexScan with complex sql statements
Expand All @@ -475,16 +475,16 @@ class LogicalPlanTestSuite extends BasePlanTest {
extractRegionTaskExecs(df3).head.verboseString(25).trim
val downgradeFilter3 = dag3.getDowngradeFilters.toArray.mkString(", ")
val selection3 = dag3.getFilters.toArray.mkString(", ")
var exceptRegionTaskExec3 =
var expectRegionTaskExec3 =
("TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[%s]")
exceptRegionTaskExec3 = exceptRegionTaskExec3.format(downgradeFilter3)
var exceptDAG3 = "[table: t1] IndexLookUp, Columns: b@VARCHAR(255), a@LONG: " +
expectRegionTaskExec3 = expectRegionTaskExec3.format(downgradeFilter3)
var expectDAG3 = "[table: t1] IndexLookUp, Columns: b@VARCHAR(255), a@LONG: " +
"{ {IndexRangeScan(Index:testindex(b,a)): { RangeFilter: [], Range: [%s] }}; " +
"{TableRowIDScan, Selection: [%s], Aggregates: Sum(a@LONG)} }, startTs: %d"
exceptDAG3 =
exceptDAG3.format(extractRangeFromDAG(dag3), selection3, dag3.getStartTs.getVersion)
assert(exceptRegionTaskExec3.equals(regionTaskExec3))
assert(exceptDAG3.equals(dag3.toString))
expectDAG3 =
expectDAG3.format(extractRangeFromDAG(dag3), selection3, dag3.getStartTs.getVersion)
assert(expectRegionTaskExec3.equals(regionTaskExec3))
assert(expectDAG3.equals(dag3.toString))
}

// https://github.com/pingcap/tispark/issues/1498
Expand Down

0 comments on commit 46cb114

Please sign in to comment.