Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix index push down #697

Merged
merged 8 commits into from May 8, 2019

fix index statistics

  • Loading branch information...
birdstorm committed May 7, 2019
commit 1f07e821f4d1f94918296eac4fdb24808ca6f924
@@ -19,12 +19,12 @@ package com.pingcap.tispark.statistics

import com.google.common.primitives.UnsignedLong
import com.pingcap.tikv.expression.{ByItem, ColumnRef, ComparisonBinaryExpression, Constant}
import com.pingcap.tikv.key.{Key, TypedKey}
import com.pingcap.tikv.key.Key
import com.pingcap.tikv.meta.TiDAGRequest.PushDownType
import com.pingcap.tikv.meta.{TiDAGRequest, TiTableInfo, TiTimestamp}
import com.pingcap.tikv.meta.{TiColumnInfo, TiDAGRequest, TiIndexInfo, TiTableInfo, TiTimestamp}
import com.pingcap.tikv.row.Row
import com.pingcap.tikv.statistics._
import com.pingcap.tikv.types.{DataType, DataTypeFactory, MySQLType}
import com.pingcap.tikv.types.BytesType
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._
@@ -77,27 +77,37 @@ object StatisticsHelper {
val histVer = row.getUnsignedLong(5)
val cMSketch = if (checkColExists(histTable, "cm_sketch")) row.getBytes(6) else null
// get index/col info for StatisticsDTO
val indexInfos = table.getIndices
.filter { _.getId == histID }
var indexInfos: mutable.Buffer[TiIndexInfo] = mutable.Buffer.empty[TiIndexInfo]

val colInfos = table.getColumns
.filter { _.getId == histID }
var colInfos: mutable.Buffer[TiColumnInfo] = mutable.Buffer.empty[TiColumnInfo]

var needed = true

// we should only query those columns that user specified before
if (!loadAll && !neededColIds.contains(histID)) needed = false

val indexFlag = if (isIndex) 1 else 0
var dataType: DataType = DataTypeFactory.of(MySQLType.TypeBlob)
// Columns info found
if (!isIndex && colInfos.nonEmpty) {
dataType = colInfos.head.getType
} else if (indexInfos.isEmpty) {
logger.warn(
s"Cannot find histogram id $histID in table info ${table.getName}[${table.getId}] now. It may be deleted."
)
needed = false
val (indexFlag, dataType) = if (isIndex) {
indexInfos = table.getIndices.filter { _.getId == histID }
if (indexInfos.isEmpty) {
logger.warn(
s"Cannot find index histogram id $histID in table info ${table.getName}[${table.getId}] now. It may be deleted."
)
needed = false
(1, null)
} else {
(1, BytesType.BLOB)
}
} else {
colInfos = table.getColumns.filter { _.getId == histID }
if (colInfos.isEmpty) {
logger.warn(
s"Cannot find column histogram id $histID in table info ${table.getName}[${table.getId}] now. It may be deleted."
)
needed = false
(0, null)
} else {
(0, colInfos.head.getType)
}
}

if (needed) {
@@ -158,8 +168,8 @@ object StatisticsHelper {
var lowerBound: Key = null
var upperBound: Key = null
// all bounds are stored as blob in bucketTable currently, decode using blob type
lowerBound = TypedKey.toTypedKey(row.getBytes(6), DataTypeFactory.of(MySQLType.TypeBlob))
upperBound = TypedKey.toTypedKey(row.getBytes(7), DataTypeFactory.of(MySQLType.TypeBlob))
lowerBound = Key.toRawKey(row.getBytes(6))
upperBound = Key.toRawKey(row.getBytes(7))
totalCount += count
buckets += new Bucket(totalCount, repeats, lowerBound, upperBound)
}
@@ -46,7 +46,7 @@ class StatisticsManagerSuite extends BaseTiSparkSuite {
StatisticsManager.loadStatisticsInfo(fDataIdxTbl)
}

ignore("Test fixed table size estimation") {
test("Test fixed table size estimation") {
tidbStmt.execute("DROP TABLE IF EXISTS `tb_fixed_float`")
tidbStmt.execute("DROP TABLE IF EXISTS `tb_fixed_int`")
tidbStmt.execute("DROP TABLE IF EXISTS `tb_fixed_time`")
@@ -102,7 +102,7 @@ class StatisticsManagerSuite extends BaseTiSparkSuite {
assert(timeBytes >= 19 * 2)
}

ignore("select count(1) from full_data_type_table_idx where tp_int = 2006469139 or tp_int < 0") {
test("select count(1) from full_data_type_table_idx where tp_int = 2006469139 or tp_int < 0") {
val indexes = fDataIdxTbl.getIndices
val idx = indexes.filter(_.getIndexColumns.asScala.exists(_.matchName("tp_int"))).head

@@ -115,7 +115,7 @@ class StatisticsManagerSuite extends BaseTiSparkSuite {
testSelectRowCount(expressions, idx, 46)
}

ignore(
test(
"select tp_int from full_data_type_table_idx where tp_int < 5390653 and tp_int > -46759812"
) {
val indexes = fDataIdxTbl.getIndices
@@ -144,7 +144,7 @@ class StatisticsManagerSuite extends BaseTiSparkSuite {
assert(rc == expectedCount)
}

val indexSelectionCases = Map(
private val indexSelectionCases = Map(
// double read case
"select tp_bigint, tp_real from full_data_type_table_idx where tp_int = 2333" -> "idx_tp_int",
"select * from full_data_type_table_idx where id_dt = 2333" -> "",
@@ -157,7 +157,7 @@ class StatisticsManagerSuite extends BaseTiSparkSuite {
indexSelectionCases.foreach((t: (String, String)) => {
val query = t._1
val idxName = t._2
ignore(query) {
test(query) {
val executedPlan = spark.sql(query).queryExecution.executedPlan
val usedIdxName = {
if (isDoubleRead(executedPlan)) {
@@ -168,6 +168,8 @@ class StatisticsManagerSuite extends BaseTiSparkSuite {
extractUsedIndex(coprocessorRDD)
}
}
spark.sql(query).show()
println(usedIdxName, idxName)
assert(usedIdxName.equals(idxName))
}
})
@@ -17,39 +17,21 @@

import static com.pingcap.tikv.util.KeyRangeUtils.makeCoprocRange;

import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.pingcap.tikv.meta.TiIndexInfo;
import com.pingcap.tikv.predicates.IndexRange;
import org.tikv.kvproto.Coprocessor.KeyRange;

// IndexScanKeyRangeBuilder accepts a table id, a index info, and a index range.
// With these info, it can build a keyrange which can be used for index scan.
// With these info, it can build a key range which can be used for index scan.
// TODO: more refactoring on the way
public class IndexScanKeyRangeBuilder {
public class IndexScanKeyRangeBuilder extends KeyRangeBuilder {
private final long id;
private final TiIndexInfo index;
private final IndexRange ir;
private final Key pointKey;
private Key lPointKey;
private Key uPointKey;
private Key lKey;
private Key uKey;

public IndexScanKeyRangeBuilder(long id, TiIndexInfo index, IndexRange ir) {
super(ir);
this.id = id;
this.index = index;
this.ir = ir;
pointKey = ir.hasAccessKey() ? ir.getAccessKey() : Key.EMPTY;
}

private KeyRange computeWithOutRange() {
lPointKey = pointKey;
uPointKey = pointKey.nextPrefix();

lKey = Key.EMPTY;
uKey = Key.EMPTY;
return toPairKey();
}

private KeyRange toPairKey() {
@@ -58,38 +40,8 @@ private KeyRange toPairKey() {
return makeCoprocRange(lbsKey.toByteString(), ubsKey.toByteString());
}

private KeyRange computeWithRange() {
Range<TypedKey> range = ir.getRange();
lPointKey = pointKey;
uPointKey = pointKey;

if (!range.hasLowerBound()) {
// -INF
lKey = Key.NULL;
} else {
lKey = range.lowerEndpoint();
if (range.lowerBoundType().equals(BoundType.OPEN)) {
lKey = lKey.nextPrefix();
}
}

if (!range.hasUpperBound()) {
// INF
uKey = Key.MAX;
} else {
uKey = range.upperEndpoint();
if (range.upperBoundType().equals(BoundType.CLOSED)) {
uKey = uKey.nextPrefix();
}
}
return toPairKey();
}

public KeyRange compute() {
if (!ir.hasRange()) {
return computeWithOutRange();
} else {
return computeWithRange();
}
computeKeyRange();
return toPairKey();
}
}
@@ -18,6 +18,7 @@
import static com.pingcap.tikv.codec.KeyUtils.formatBytes;
import static java.util.Objects.requireNonNull;

import com.google.common.primitives.Bytes;
import com.google.protobuf.ByteString;
import com.pingcap.tikv.codec.CodecDataOutput;
import com.pingcap.tikv.types.DataType;
@@ -171,6 +172,13 @@ public boolean equals(Object other) {
}
}

public Key append(Key other) {
if (other == null) {
return this;
}
return Key.toRawKey(Bytes.concat(getBytes(), other.getBytes()));
}

@Override
public int hashCode() {
return Arrays.hashCode(value) * infFlag;
@@ -0,0 +1,77 @@
/*
* Copyright 2019 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pingcap.tikv.key;

import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.pingcap.tikv.predicates.IndexRange;

class KeyRangeBuilder {
This conversation was marked as resolved by birdstorm

This comment has been minimized.

Copy link
@zhexuany

zhexuany May 8, 2019

Member

This class can be an abstract class with abstract Pair<Key, Key> compute() method. The benefit is we cannot directly use KeyRangeBuilder but use a concrete one.

This comment has been minimized.

Copy link
@birdstorm

birdstorm May 8, 2019

Author Member

we cannot use abstract Pair<Key, Key> compute() because compute() returns different types in different classes extended

This comment has been minimized.

Copy link
@birdstorm

birdstorm May 8, 2019

Author Member

I can make KeyRangeBuilder abstract though.

This comment has been minimized.

Copy link
@zhexuany

zhexuany May 8, 2019

Member

that works too.

This comment has been minimized.

Copy link
@birdstorm

birdstorm May 8, 2019

Author Member

done.


private final IndexRange ir;
private final Key pointKey;
Key lPointKey;
This conversation was marked as resolved by zhexuany

This comment has been minimized.

Copy link
@zhexuany

zhexuany May 8, 2019

Member

private or protected?

This comment has been minimized.

Copy link
@birdstorm

birdstorm May 8, 2019

Author Member

package private

Key uPointKey;
Key lKey;
Key uKey;

KeyRangeBuilder(IndexRange ir) {
this.ir = ir;
pointKey = ir.hasAccessKey() ? Key.toRawKey(ir.getAccessKey().getBytes()) : Key.EMPTY;
}

private void computeWithOutRange() {
lPointKey = pointKey;
uPointKey = pointKey.nextPrefix();

lKey = Key.EMPTY;
uKey = Key.EMPTY;
}

private void computeWithRange() {
Range<TypedKey> range = ir.getRange();
lPointKey = pointKey;
uPointKey = pointKey;

if (!range.hasLowerBound()) {
// -INF
lKey = Key.NULL;
} else {
lKey = Key.toRawKey(range.lowerEndpoint().getBytes());
if (range.lowerBoundType().equals(BoundType.OPEN)) {
lKey = lKey.nextPrefix();
}
}

if (!range.hasUpperBound()) {
// INF
uKey = Key.MAX;
} else {
uKey = Key.toRawKey(range.upperEndpoint().getBytes());
if (range.upperBoundType().equals(BoundType.CLOSED)) {
uKey = uKey.nextPrefix();
}
}
}

void computeKeyRange() {
if (!ir.hasRange()) {
computeWithOutRange();
} else {
computeWithRange();
}
}
}
@@ -0,0 +1,37 @@
/*
* Copyright 2019 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.pingcap.tikv.key;

import com.pingcap.tikv.predicates.IndexRange;
import com.pingcap.tikv.util.Pair;

public class StatisticsKeyRangeBuilder extends KeyRangeBuilder {

public StatisticsKeyRangeBuilder(IndexRange ir) {
super(ir);
}

private Pair<Key, Key> toPairKey() {
Key lbsKey = Key.toRawKey(lPointKey.append(lKey).getBytes());
Key ubsKey = Key.toRawKey(uPointKey.append(uKey).getBytes());
return new Pair<>(lbsKey, ubsKey);
}

public Pair<Key, Key> compute() {
computeKeyRange();
return toPairKey();
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.