Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix Request range exceeds bound error for partition table (#43146) #43179

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import (
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikv"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -3354,6 +3355,10 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
if len(partitions) == 0 {
return &TableDualExec{baseExecutor: *ret.base()}
}

slices.SortFunc(partitions, func(a, b table.PhysicalTable) bool {
return a.GetPhysicalID() < b.GetPhysicalID()
})
ret.kvRangeBuilder = kvRangeBuilderFromRangeAndPartition{
sctx: b.ctx,
partitions: partitions,
Expand Down Expand Up @@ -3473,6 +3478,9 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table
usedPartition = append(usedPartition, p)
}
}
slices.SortFunc(usedPartition, func(a, b table.PhysicalTable) bool {
return a.GetPhysicalID() < b.GetPhysicalID()
})
return usedPartition, true, contentPos, nil
}

Expand Down Expand Up @@ -4053,6 +4061,9 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
kvRanges = append(tmp, kvRanges...)
}
}
slices.SortFunc(kvRanges, func(a, b kv.KeyRange) bool {
return bytes.Compare(a.StartKey, b.StartKey) < 0
})
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}

Expand Down Expand Up @@ -4083,6 +4094,9 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
kvRanges = append(kvRanges, tmp...)
}
}
slices.SortFunc(kvRanges, func(a, b kv.KeyRange) bool {
return bytes.Compare(a.StartKey, b.StartKey) < 0
})
return builder.buildTableReaderFromKvRanges(ctx, e, kvRanges)
}

Expand Down
8 changes: 8 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -303,6 +304,9 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
var builder distsql.RequestBuilder
slices.SortFunc(kvRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand Down Expand Up @@ -608,6 +612,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
}

// init kvReq, result and worker for this partition
// The key ranges should be ordered.
slices.SortFunc(kvRange, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
kvReq, err := builder.SetKeyRanges(kvRange).Build()
if err != nil {
worker.syncErr(err)
Expand Down
31 changes: 31 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -418,3 +419,33 @@ func TestPartitionTableIndexJoinIndexLookUp(t *testing.T) {
tk.MustQuery("select /*+ TIDB_INLJ(t1, t2) */ t1.* from t t1, t t2 use index(a) where t1.a=t2.b and " + cond).Sort().Check(result)
}
}

func TestPartitionTableRangeRequestOrdered(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/checkKeyRangeSorted", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/checkKeyRangeSorted"))
}()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("CREATE TABLE tp (" +
"id int, c varchar(128), " +
"key c(c))" +
"partition by range (id) (" +
"partition p0 values less than (10), " +
"partition p1 values less than MAXVALUE" +
")")
tk.MustExec("alter table tp truncate partition p0")
tk.MustExec("insert into tp values (1, 'a'), (100, 'b')")
tk.MustExec("analyze table tp")
tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'")
tk.MustQuery("select * from tp order by id asc").Check(testkit.Rows("1 a", "100 b"))
tk.MustQuery("select /*+ use_index(tp, c) */ * from tp order by c asc").Check(testkit.Rows("1 a", "100 b"))
tk.MustQuery("select /*+ use_index(tp, c) */ c from tp order by c asc").Check(testkit.Rows("a", "b"))
tk.MustQuery("select count(*) from tp").Check(testkit.Rows("2"))
tk.MustQuery("select /*+ inl_join(t1, t2)*/ * from tp t1, tp t2 where t1.c=t2.c order by t1.id asc").Check(testkit.Rows("1 a 1 a", "100 b 100 b"))
tk.MustQuery("select /*+ hash_join(t1, t2)*/ * from tp t1, tp t2 where t1.c=t2.c order by t1.id asc").Check(testkit.Rows("1 a 1 a", "100 b 100 b"))
tk.MustQuery("select /*+ inl_hash_join(t1, t2)*/ * from tp t1, tp t2 where t1.c=t2.c order by t1.id asc").Check(testkit.Rows("1 a 1 a", "100 b 100 b"))
}
5 changes: 5 additions & 0 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -321,6 +322,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
}

// init kvReq and worker for this partition
// The key ranges should be ordered.
slices.SortFunc(keyRange, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
kvReq, err := builder.SetKeyRanges(keyRange).Build()
if err != nil {
worker.syncErr(e.resultCh, err)
Expand Down
5 changes: 5 additions & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package executor

import (
"bytes"
"context"
"sort"
"time"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/exp/slices"
)

// make sure `TableReaderExecutor` implements `Executor`.
Expand Down Expand Up @@ -305,6 +307,9 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
if err != nil {
return nil, err
}
slices.SortFunc(kvReq.KeyRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)

result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
Expand Down
11 changes: 11 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package copr

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -52,6 +53,7 @@ import (
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var coprCacheHistogramEvict = tidbmetrics.DistSQLCoprCacheHistogram.WithLabelValues("evict")
Expand All @@ -71,6 +73,15 @@ type CopClient struct {

// Send builds the request and gets the coprocessor iterator response.
func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interface{}, option *kv.ClientSendOption) kv.Response {
failpoint.Inject("checkKeyRangeSorted", func(_ failpoint.Value) {
isSorted := slices.IsSortedFunc(req.KeyRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
if !isSorted {
logutil.BgLogger().Fatal("distsql request key range not sorted!")
}
})

eventCb := option.EventCb
enabledRateLimitAction := option.EnabledRateLimitAction
sessionMemTracker := option.SessionMemTracker
Expand Down