The physical plan is the physical execution plan in TiSpark. When we use explain in Spark which runs with TiSpark, the process of the physical plan will be displayed in the terminal. But now this display about how the plan executes has some problems.
Residual Filter
should not appear in physical plan explanation.- unclear representation of the execution process.
- expression(s) that constitutes the scanning range is hard to know.
- confusing naming of the operator.
The display of the physical plan needs to be improved.
If we call Spark's explain
, we may see the following output.
== Physical Plan ==
*(1) ColumnarToRow
// many other node.
+- ...
+- TiKV CoprocessorRDD{[table: t1] TableScan, Columns: a@UNSIGNED LONG, Residual Filter: [a@UNSIGNED LONG GREATER_THAN 1], PushDown Filter: [a@UNSIGNED LONG GREATER_THAN 1], KeyRange: [([t\200\000\000\000\000\000\004W_r\000\000\000\000\000\000\000\000], [t\200\000\000\000\000\000\004W_s\000\000\000\000\000\000\000\000])], Aggregates: , startTs: 433780573880188929}
// many other node.
+- ...
What we are interested in here is the content in CoprocessorRDD
. This node is the output of TiSpark, other nodes are the output of Spark. The content in the CoprocessRDD
is the output of the toStringInternal
method called by TiDAGRequest
.
First of all, let's introduce a few member variables in TiDAGRequest
.
indexInfo
is the index to be scanned by the physical plan, if it is empty, it means scanning the table.isDoubleRead
istrue
means scanning table after scanning the index,false
means only scanning the index or scanning the table.filters
is the selection expression in the physical plan in normal execution.downgradeFilters
is the selection condition of the physical plan after the downgrade is triggered (what is downgrade will be described later).pushdownFilters
is only used intoStringInternal
. WhenindexInfo==null
orisDoubleRead==false
or all the columns involved in the filter are in the index, the content ofpushDownFilters
is equal to filters, otherwise,pushDownFilters
is empty.keyRange
is the scan range of the table or index.
The main code is in toStringInternal
.
From the code we can see that the execution plan is divided into three kinds of IndexScan
, CoveringIndexScan
, and TableScan
; TiDAGRequest
has three kinds Filter
—— Downgrade Filter
, Resdiual Filter
, PushDown Filter
. There is zero or more Filter
for an execution plan. We will describe the meaning of each execution plan and Filter
in detail below.
In TiSpark, an IndexScan
requires two steps. The first step is scanning in index data and the second step is scanning in table data to get the data we need from the results returned in the first scan.
CoveringIndexScan
is a special case of IndexScan
. If the column(s) in Filter
and the column(s) in Projection
are both inside the Index, then we only need to scan the index once, no need to scan for the table. That is, it only scans for the index, such a scan is called CoveringIndexScan
.
TableScan
is different from IndexScan
and CoveringIndexScan
. TableScan
only scans table data.
If the second scan in IndexScan
does too many queries on COP(TiKV Coprocessor)/TiKV, it can cause performance problems in COP/TiKV. To solve this problem a downgrading mechanism is introduced. Downgrade Filter
is used after IndexScan
downgrade.
The first scan of IndexScan
will return the data that meets the conditions, and then TiSpark will sort and aggregate the returned data to obtain the regionTask
that needs to be done in the IndexScan
second scan. If the number of regionTask
is bigger than downgradeThreshold
, a downgrade will be triggered. When a downgrade is triggered, the range of the second scan table will be changed from the returned value of the first scan index to all values between the minimum and maximum value of the first scan index, and the filters
of the second scan will change to downgradeFilters
(downgradeFilters
is the same as if the execution plan is TableScan
's filters
).
RegionTask
For all returned data, all consecutive data in a region will be treated as a
regionTask
.For example like this the data returned in the first stage are 1, 3, 4, 5 and 1, 3, 4 are in the same region and 5 is in another region. Since 1 and 3, 4 are not contiguous, 1 is a
regionTask
, and since 3, 4 and 5 are not in a region, 3, 4 is aregionTask
and 5 is aregionTask
. The number ofregionTask
is three.
In the original design, the Residual Filter
represents operators that cannot be pushed down to COP/TiKV. However, in the current implementation, before the construction of TiDAGRequest
, it will judge whether the operators can be pushed down, and only the operators that can be pushed down will participate in the construction of TiDAGRequest
. This means that all operators in TiDAGRequest
can be pushed down to COP/TiKV. The Residual Filter
loses its original meaning because there are no operators in TiDAGRequest
that cannot be pushed down.
The expression passed to COP/TiKV as a selection expression without triggering a downgrade.
-
Residual Filter
should not appear in physical plan explanation.As stated before, the
Residual Filter
loses its meaning and should not be present in the physical plan explanation. ButResidual Filter
still appears inTableScan
andCoveringIndexScan
.CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
select * from t1 where a>0 and b>'aa'
== Physical Plan == *(1) ColumnarToRow +- TiKV CoprocessorRDD{[table: t1] TableScan, Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255), Residual Filter: [a@LONG GREATER_THAN 0], [b@VARCHAR(255) GREATER_THAN "aa"], PushDown Filter: [a@LONG GREATER_THAN 0], [b@VARCHAR(255) GREATER_THAN "aa"], KeyRange: [([t\200\000\000\000\000\000\rE_r\000\000\000\000\000\000\000\000], [t\200\000\000\000\000\000\rE_s\000\000\000\000\000\000\000\000])], startTs: 434247263670239233}
-
unclear representation of the execution process
-
unable to know the execution steps
As stated before, the execution process for
IndexScan
is divided into two phases, the first phase for index data scanning, and the second phase for table data scanning. However, we cannot see this execution process in the physical plan explanation. -
the displayed
Filter
is not the one that would be executed under normal executionThe
Downgrade Filter
inRegionTaskExec
is designed to indicate that theDowngrade Filter
will be used after the downgrade(There is some problem with the content displayed inRegionTaskExec
, it should show the content ofdowngradeFilters
inTiDAGRequest
, but it displays the content offilters
inTiDAGRequest
. We will fix this bug here.). However, theDowngrade Filter
is displayed again in theFetchHandleRDD
and thePushDown Filter
, which is executed under normal circumstances, is not displayed.RegionTaskExec
RegionTaskExec
is the node that determines whether to downgrade.FetchHandleRDD
When the
isDouble
variable ofTiDAGRequest
is true, theCoprocessorRDD
is calledFetchHandleRDD
. -
information about the used index is hard to know
In the physical plan explanation, only the name of the index we use is shown, not the columns that make up the index.
CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin CREATE INDEX `testIndex` ON `t1` (`a`,`b`)
SELECT * FROM t1 where a>0 and b > 'aa'
== Physical Plan == *(1) ColumnarToRow +- TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[[b@VARCHAR(255) GREATER_THAN "aa"]] +- RowToColumnar +- TiKV FetchHandleRDD{[table: t1] IndexScan[Index: testindex] , Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255), Downgrade Filter: [b@VARCHAR(255) GREATER_THAN "aa"], [a@LONG GREATER_THAN 0], KeyRange: [([t\200\000\000\000\000\000\rA_i\200\000\000\000\000\000\000\001\003\200\000\000\000\000\000\000\001], [t\200\000\000\000\000\000\rA_i\200\000\000\000\000\000\000\001\372])], startTs: 434247241322725377}
-
-
expression(s) that constitutes the scanning range is hard to know
The expression(s) for building ranges are not shown explicitly, which makes it difficult to know which expression(s) are used to build range. As shown below, for the query expression
a>0
is pushed down, but the pushed-down information is only implicitly given inside theKeyRange
, which is not convenient for users to understand.CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin CREATE INDEX `testIndex` ON `t1` (`a`,`b`)
SELECT a FROM t1 where a>0
== Physical Plan == *(1) ColumnarToRow +- TiKV CoprocessorRDD{[table: t1] CoveringIndexScan[Index: testindex] , Columns: a@LONG, KeyRange: [([t\200\000\000\000\000\000\r=_i\200\000\000\000\000\000\000\001\003\200\000\000\000\000\000\000\001], [t\200\000\000\000\000\000\r=_i\200\000\000\000\000\000\000\001\372])], startTs: 434247223260741633}
-
confusing naming of the operator
In TiDB, a process named
Scan
means scanning a piece of data, not aggregating and computing the result of the scan. But in TiSpark, a process named scan means aggregating and computing the results of the scan. This may confuse users who switch from TiDB to TiSpark.The selection expression pushed down to TiKV/COP in TiDB is called
Selection
, while the selection expression is calledPushdown Filter
in TiSpark. This may also confuse users who switch from TiDB to TiSpark.
To solve the problem of appearing a filter that shouldn't appear, we removed the Residual Filter
.
As shown below, in the case of the same table and query as in The Problem of DAG Explain section, the Residual Filter
has been removed from the explain
output.
== Physical Plan ==
*(1) ColumnarToRow
+- TiKV CoprocessorRDD{[table: t1] TableReader, Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255): { TableRangeScan: { RangeFilter: [], Range: [([t\200\000\000\000\000\000\r\'_r\000\000\000\000\000\000\000\000], [t\200\000\000\000\000\000\r\'_s\000\000\000\000\000\000\000\000])] }, Selection: [[a@LONG GREATER_THAN 1]] }, startTs: 434245944457035777}
-
unable to know the execution steps
To solve the problem of the execution process for
IndexScan
is divided into two phases is not show, we divide the scan ofIndexScan
into two partsIndexRangeScan
,TableRowIDScan
. The scan ofTableScan
is namedTableRangeScan
. The scan ofCoveringIndexScan
is namedIndexRangeScan
. The meanings of these scans are shown below.TableRangeScan
: Table scans with the specified range. We consider full table scan as a special case ofTableRangeScan
, so full table scan is also calledTableRangeScan
.TableRowIDScan
: Scans the table data based on theRowID
. Usually follows an index read operation to retrieve the matching data rows.IndexRangeScan
: Index scans with the specified range. We consider full index scan as a special case ofIndexRangeScan
, so full index scan is also calledIndexRangeScan
.
-
the displayed
Filter
is not the one that would be executed under normal executionTo solve the problem of the displayed
Filter
is not the one that would be executed under normal execution, we delete theDowngrade Filter
and add theSelection
(Selection
will be introduced later) inFetchHandleRDD
. -
information about the used index is hard to know
To solve the problem of information about the used index is hard to know, we add the columns that make up the index after the index name.
As shown below, in the case of the same table and query as in The Problem of DAG Explain section, add IndexRangeScan
and TableRowIDScan
, delete the Downgrade Filter
and add the Selection
in FetchHandleRDD
, add the columns that make up the index after the index name to the output.
== Physical Plan ==
*(1) ColumnarToRow
+- TiSpark RegionTaskExec{downgradeThreshold=1000000000,downgradeFilter=[[b@VARCHAR(255) GREATER_THAN "aa"], [a@LONG GREATER_THAN 0]]
+- RowToColumnar
+- TiKV FetchHandleRDD{[table: t1] IndexLookUp, Columns: a@LONG, b@VARCHAR(255), c@VARCHAR(255): { {IndexRangeScan(Index:testindex(a,b)): { RangeFilter: [[a@LONG GREATER_THAN 0]], Range: [([t\200\000\000\000\000\000\r-_i\200\000\000\000\000\000\000\001\003\200\000\000\000\000\000\000\001], [t\200\000\000\000\000\000\r-_i\200\000\000\000\000\000\000\001\372])] }, Selection: [[b@VARCHAR(255) GREATER_THAN "aa"]]}; {TableRowIDScan, Selection: [[b@VARCHAR(255) GREATER_THAN "aa"]]} }, startTs: 434247058345951234}
To solve the problem of expression(s) that constitutes the scanning range is hard to know, we added a RangeFilter
to the KeyRange
to indicate the expression(s) used to construct the scan range.
RangeFilter
:RangeFilter
indicates which expression(s) the range is made up of. IfRangeFilter
is empty, it indicates a full table scan or full index scan.RangeFilter
generally appears when the query involves an index range, when query the expressions in theRangeFilter
form the scanned range from left to right.
As shown below, in the case of the same table and query as in The Problem of DAG Explain section, add RangeFilter
to the output.
== Physical Plan ==
*(1) ColumnarToRow
+- TiKV CoprocessorRDD{[table: t1] IndexReader, Columns: a@LONG: { IndexRangeScan(Index:testindex(a,b)): { RangeFilter: [[a@LONG GREATER_THAN 0]], Range: [([t\200\000\000\000\000\000\rH_i\200\000\000\000\000\000\000\001\003\200\000\000\000\000\000\000\001], [t\200\000\000\000\000\000\rH_i\200\000\000\000\000\000\000\001\372])] } }, startTs: 434247289531006977}
To solve the problem of confusing the naming of the operator, we change the operator to the same name as TiDB. IndexScan
has changed to IndexLookUp
; CoveringIndexScan
has changed to IndexReader
; TableScan
has changed to TableReader
. The PushDown Filter
has changed to Selection
.
TableReader
: Aggregates the data obtained by the underlying operatorTableRangeScan
in TiKV.IndexReader
: Aggregates the data obtained by the underlying operatorIndexRangeScan
in TiKV.IndexLookUp
: First aggregates the RowIDs (in TiKV) scanned by the first scan in the index. Then at the second scan in the table, accurately reads the data from TiKV based on these RowIDs. At the first scan in the index, there isIndexRangeScan
operator; at the second scan in the table, there is theTableRowIDScan
operator.Selection
: The expression passed to COP/TiKV as selection expression without triggering a downgrade.
-
Table without cluster index
CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
-
TableScan
withSelection
and withoutRangeFilter
SELECT * FROM t1 where a>0 and b > 'aa'
-
TableScan
with complex sql statementsselect * from t1 where a>0 or b > 'aa' or c<'cc' and c>'aa' order by(c) limit(10)
-
-
Table with cluster index
TiDB version smaller than 5.0
CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL, PRIMARY KEY (`a`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
TiDB version bigger than 5.0
CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL, PRIMARY KEY (`a`) clustered ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
-
TableScan
withSelection
and withRangeFilter
SELECT * FROM t1 where a>0 and b > 'aa'
-
TableScan
withoutSelection
and withRangeFilter
SELECT * FROM t1 where a>0
-
TableScan
withSelection
and withRangeFilter
SELECT * FROM t1 where b>'aa'
-
-
Table with cluster index and partition
CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL, PRIMARY KEY (a) )PARTITION BY RANGE (a) ( PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16), PARTITION p3 VALUES LESS THAN MAXVALUE )
-
TableScan
withSelection
and withRangeFilter
with partitionSELECT a,b FROM t1 where a>0 and b>'aa'
-
-
Table with secondary index
CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL, ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin CREATE INDEX `testIndex` ON `t1` (`a`,`b`);
-
IndexScan
withSelection
and withRangeFilter
SELECT * FROM t1 where a>0 and b > 'aa'
-
IndexScan
withoutSelection
and withRangeFilter
SELECT * FROM t1 where a=0 and b > 'aa'
-
CoveringIndex
withSelection
and withRangeFilter
SELECT a,b FROM t1 where a>0 and b > 'aa'
-
IndexScan
with complex sql statementsSELECT max(c) FROM t1 where a>0 and c > 'cc' and c < 'bb' group by c order by(c)
-
CoveringIndexScan
with complex sql statementsselect sum(a) from t1 where a>0 and b > 'aa' or b<'cc' and a>0
-
-
Table with secondary prefix index
CREATE TABLE `t1` ( `a` BIGINT(20) NOT NULL, `b` varchar(255) NOT NULL, `c` varchar(255) DEFAULT NULL, ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin CREATE INDEX `testIndex` ON `t1` (`b`(4),a);
-
IndexScan
withRangeFilter
and withSelection
SELECT * FROM t1 where a>0 and b > 'aa'
-
IndexScan
withRangeFilter
and withoutSelection
SELECT * FROM t1 where b > 'aa'
-
IndexScan
with complex sql statementsselect a from t1 where a>0 and b > 'aa' or c<'cc' and c>'aa' order by(c) limit(10) group by a
-