Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make TiSpark's Explain clearer and easier to read #2439

Merged
merged 46 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8742286
change string() in TiDAGRequest
qidi1 Jun 22, 2022
0ba3dd5
change name of scan
qidi1 Jun 28, 2022
d8fe1c8
merge master
qidi1 Jul 1, 2022
f04abcb
repaire test error
qidi1 Jul 1, 2022
dcdae3c
format code
qidi1 Jul 1, 2022
46cb114
repair word error
qidi1 Jul 1, 2022
4441c17
repair test error
qidi1 Jul 2, 2022
9a38567
add doc of execution plan in tispark
qidi1 Jul 4, 2022
0a508cf
add github action of alter primary key false
qidi1 Jul 2, 2022
e637e81
set alter-primary-key true
qidi1 Jul 4, 2022
41dc92b
delete mutit jdk
qidi1 Jul 4, 2022
bdf1a42
repaire TLS test
qidi1 Jul 4, 2022
1479efe
empty line in alter-primary-key-false-test.yml
qidi1 Jul 4, 2022
ff3bdff
delete delete-test in alter-primary-key-false-test.yml
qidi1 Jul 4, 2022
fca8686
change doc
qidi1 Jul 4, 2022
699e8e8
use seq in tidb-alter-primary-key-false change name in IndexScanType
qidi1 Jul 4, 2022
5ada226
rename indexscan to indexLookup
qidi1 Jul 4, 2022
293bcce
repair license error
qidi1 Jul 4, 2022
0fabebe
repair tikv cant run
qidi1 Jul 5, 2022
e04b3fd
change to use docker
qidi1 Jul 5, 2022
8957629
change docker name
qidi1 Jul 5, 2022
42f1b01
change to tidb4.0 compose
qidi1 Jul 5, 2022
aaf2687
change compose-4.0 tidb version
qidi1 Jul 5, 2022
a3eacde
change compose-4.0 tidb version
qidi1 Jul 5, 2022
4929d65
remove matrix of yml
qidi1 Jul 5, 2022
82e1acf
rename scan table to scan data
qidi1 Jul 6, 2022
29b468a
simple index scan
qidi1 Jul 11, 2022
7fc70dd
remove unused function
qidi1 Jul 11, 2022
f8cfb8e
format code
qidi1 Jul 11, 2022
b8e7bb9
rename function
qidi1 Jul 11, 2022
9e679c3
rename function
qidi1 Jul 11, 2022
27e9d3f
rename function of buildScan
qidi1 Jul 13, 2022
9cfafb4
rewrite buildScan function
qidi1 Jul 14, 2022
8b385a7
format code
qidi1 Jul 14, 2022
212ab5e
format code
qidi1 Jul 14, 2022
482ff2d
format code
qidi1 Jul 14, 2022
2679207
resolve bug
qidi1 Jul 14, 2022
00fa67d
resolve grammer error
qidi1 Jul 14, 2022
cad259f
repaire bug
qidi1 Jul 17, 2022
48eeb9e
repaire bug
qidi1 Jul 18, 2022
5120ff4
repaire bug
qidi1 Jul 18, 2022
fa70f87
repaire bug
qidi1 Jul 18, 2022
3d9d4d0
repaire bug
qidi1 Jul 19, 2022
ee6a767
repaire bug
qidi1 Jul 19, 2022
ba16a70
Merge branch 'master' into phyicalplanexplain
shiyuhang0 Jul 19, 2022
dce8c31
repaire bug
qidi1 Jul 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 42 additions & 0 deletions .github/workflows/alter-primary-key-false-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: alter-primary-key-false-test
qidi1 marked this conversation as resolved.
Show resolved Hide resolved

on:
push:
branches:
- master
pull_request:
branches:
- master

jobs:
test:
runs-on: ubuntu-latest
name: Java adopt sample
steps:

- name: checkout
uses: actions/checkout@v2

- name: set up JDK
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'adopt'
cache: maven

- name: add host and copy properties
run: |
echo -e "127.0.0.1 pd0 \n127.0.0.1 tikv0" | sudo tee -a /etc/hosts
sudo cp -r config /config
sed -i 's/^alter-primary-key.*/alter-primary-key=false/g' ./config/tidb-4.0.toml
echo "spark.sql.catalog.tidb_catalog=org.apache.spark.sql.catalyst.catalog.TiCatalog" > tidb_config.properties
mv tidb_config.properties core/src/test/resources/tidb_config.properties

- name: build docker
run: docker-compose -f docker-compose-4.0.yaml up -d
qidi1 marked this conversation as resolved.
Show resolved Hide resolved

- name: build
run: mvn clean package -Dmaven.test.skip=true -B

- name: test
run: mvn test -am -pl core -Dtest=moo -DwildcardSuites=org.apache.spark.sql.catalyst.plans.logical.LogicalPlanTestSuite -DfailIfNoTests=false
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ case class ColumnarRegionTaskExec(
override def simpleString(maxFields: Int): String = verboseString(maxFields)

override def verboseString(maxFields: Int): String =
s"TiSpark $nodeName{downgradeThreshold=$downgradeThreshold,downgradeFilter=${dagRequest.getFilters}"
s"TiSpark $nodeName{downgradeThreshold=$downgradeThreshold,downgradeFilter=${dagRequest.getDowngradeFilters}"

private def inputRDD(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class InvisibleIndexSuite extends BasePlanTest {
"insert into t_invisible_index values(1, 1),(2, 2),(3, 3),(4, 4),(5, 5),(6, 6)")
tidbStmt.execute("analyze table t_invisible_index")
val df = spark.sql("select * from t_invisible_index where a = 1")
checkIsIndexScan(df, "t_invisible_index")
checkIsIndexLookUp(df, "t_invisible_index")
checkIndex(df, "idx_a")
}

Expand All @@ -65,7 +65,7 @@ class InvisibleIndexSuite extends BasePlanTest {
tidbStmt.execute("analyze table t_invisible_index")
val df = spark.sql("select * from t_invisible_index where a = 1")
intercept[TestFailedException] {
checkIsIndexScan(df, "t_invisible_index")
checkIsIndexLookUp(df, "t_invisible_index")
checkIndex(df, "idx_a")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.catalyst.plans

import com.pingcap.tikv.meta.TiDAGRequest.IndexScanType
import com.pingcap.tikv.meta.TiDAGRequest.ScanType
import com.pingcap.tikv.meta.{TiDAGRequest, TiIndexInfo}
import org.apache.spark.sql.execution.{ColumnarCoprocessorRDD, ColumnarRegionTaskExec, SparkPlan}
import org.apache.spark.sql.{BaseTiSparkTest, Dataset}
Expand All @@ -32,24 +32,38 @@ class BasePlanTest extends BaseTiSparkTest {
case plan: ColumnarCoprocessorRDD => plan
case plan: ColumnarRegionTaskExec => plan
}
val extractDAGRequest: PartialFunction[SparkPlan, TiDAGRequest] = {
case plan: ColumnarRegionTaskExec => plan.dagRequest
case plan: ColumnarCoprocessorRDD => plan.dagRequest
val extractDAGRequest: PartialFunction[SparkPlan, Seq[TiDAGRequest]] = {
qidi1 marked this conversation as resolved.
Show resolved Hide resolved
case plan: ColumnarRegionTaskExec => {
List(plan.dagRequest)
}
case plan: ColumnarCoprocessorRDD => {
plan.tiRDDs.map(x => {
x.dagRequest
})
}
}

def explain[T](df: Dataset[T]): Unit = df.explain

def extractDAGRequests[T](df: Dataset[T]): Seq[TiDAGRequest] =
toPlan(df).collect { extractDAGRequest }
toPlan(df).collect {
extractDAGRequest
}.flatten

def extractTiSparkPlans[T](df: Dataset[T]): Seq[SparkPlan] =
toPlan(df).collect { extractTiSparkPlan }
toPlan(df).collect {
extractTiSparkPlan
}

def extractCoprocessorRDDs[T](df: Dataset[T]): Seq[ColumnarCoprocessorRDD] =
toPlan(df).collect { extractCoprocessorRDD }
toPlan(df).collect {
extractCoprocessorRDD
}

def extractRegionTaskExecs[T](df: Dataset[T]): List[ColumnarRegionTaskExec] =
toPlan(df).collect { extractRegionTaskExec }.toList
toPlan(df).collect {
extractRegionTaskExec
}.toList

def checkIndex[T](df: Dataset[T], index: String): Unit = {
if (!extractCoprocessorRDDs(df).exists(checkIndexName(_, index))) {
Expand All @@ -64,41 +78,38 @@ class BasePlanTest extends BaseTiSparkTest {
private def extractIndexInfo(coprocessorRDD: ColumnarCoprocessorRDD): TiIndexInfo =
coprocessorRDD.dagRequest.getIndexInfo

def checkIsTableScan[T](df: Dataset[T], tableName: String): Unit =
checkIndexScanType(df, tableName, IndexScanType.TABLE_SCAN)
def checkIsTableReader[T](df: Dataset[T], tableName: String): Unit =
checkScanType(df, tableName, ScanType.TABLE_READER)

private def checkIndexScanType[T](
df: Dataset[T],
tableName: String,
indexScanType: IndexScanType): Unit = {
private def checkScanType[T](df: Dataset[T], tableName: String, scanType: ScanType): Unit = {
val tiSparkPlans = extractTiSparkPlans(df)
if (tiSparkPlans.isEmpty) {
fail(df, "No TiSpark plans found in Dataset")
}
val filteredRequests = tiSparkPlans.collect { extractDAGRequest }.filter {
val filteredRequests = tiSparkPlans.collect { extractDAGRequest }.flatten.filter {
_.getTableInfo.getName.equalsIgnoreCase(tableName)
}
if (filteredRequests.isEmpty) {
fail(df, s"No TiSpark plan contains desired table $tableName")
} else if (!tiSparkPlans.exists(checkIndexScanType(_, indexScanType))) {
} else if (!tiSparkPlans.exists(checkScanType(_, scanType))) {
fail(
df,
s"Index scan type not match: ${filteredRequests.head.getIndexScanType}, expected $indexScanType")
s"Index scan type not match: ${filteredRequests.head.getScanType}, expected $scanType")
}
}

private def checkIndexScanType(plan: SparkPlan, indexScanType: IndexScanType): Boolean =
private def checkScanType(plan: SparkPlan, scanType: ScanType): Boolean =
plan match {
case p: ColumnarCoprocessorRDD => getIndexScanType(p).equals(indexScanType)
case p: ColumnarCoprocessorRDD => getScanType(p).equals(scanType)
case _ => false
}

private def getIndexScanType(coprocessorRDD: ColumnarCoprocessorRDD): IndexScanType = {
getIndexScanType(coprocessorRDD.dagRequest)
private def getScanType(coprocessorRDD: ColumnarCoprocessorRDD): ScanType = {
getScanType(coprocessorRDD.dagRequest)
}

private def getIndexScanType(dagRequest: TiDAGRequest): IndexScanType = {
dagRequest.getIndexScanType
private def getScanType(dagRequest: TiDAGRequest): ScanType = {
dagRequest.getScanType
}

/**
Expand All @@ -109,19 +120,19 @@ class BasePlanTest extends BaseTiSparkTest {
fail(message)
}

def checkIsCoveringIndexScan[T](df: Dataset[T], tableName: String): Unit =
checkIndexScanType(df, tableName, IndexScanType.COVERING_INDEX_SCAN)
def checkIsIndexReader[T](df: Dataset[T], tableName: String): Unit =
checkScanType(df, tableName, ScanType.INDEX_READER)

def checkIsIndexScan[T](df: Dataset[T], tableName: String): Unit =
checkIndexScanType(df, tableName, IndexScanType.INDEX_SCAN)
def checkIsIndexLookUp[T](df: Dataset[T], tableName: String): Unit =
checkScanType(df, tableName, ScanType.INDEX_LOOKUP)

def checkEstimatedRowCount[T](df: Dataset[T], tableName: String, answer: Double): Unit = {
val estimatedRowCount = getEstimatedRowCount(df, tableName)
assert(estimatedRowCount === answer)
}

def getEstimatedRowCount[T](df: Dataset[T], tableName: String): Double =
extractTiSparkPlans(df).collect { extractDAGRequest }.head.getEstimatedCount
extractDAGRequests(df).head.getEstimatedCount

def toPlan[T](df: Dataset[T]): SparkPlan = df.queryExecution.sparkPlan

Expand Down