Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement tidb_bounded_staleness built-in function #24328

Merged
merged 20 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,9 +1102,10 @@ func (s *testSuite5) TestShowBuiltin(c *C) {
res := tk.MustQuery("show builtins;")
c.Assert(res, NotNil)
rows := res.Rows()
c.Assert(268, Equals, len(rows))
const builtinFuncNum = 269
c.Assert(builtinFuncNum, Equals, len(rows))
c.Assert("abs", Equals, rows[0][0].(string))
c.Assert("yearweek", Equals, rows[267][0].(string))
c.Assert("yearweek", Equals, rows[builtinFuncNum-1][0].(string))
}

func (s *testSuite5) TestShowClusterConfig(c *C) {
Expand Down
4 changes: 3 additions & 1 deletion expression/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,9 @@ var funcs = map[string]functionClass{
ast.Year: &yearFunctionClass{baseFunctionClass{ast.Year, 1, 1}},
ast.YearWeek: &yearWeekFunctionClass{baseFunctionClass{ast.YearWeek, 1, 2}},
ast.LastDay: &lastDayFunctionClass{baseFunctionClass{ast.LastDay, 1, 1}},
// TSO functions
ast.TiDBBoundedStaleness: &tidbBoundedStalenessFunctionClass{baseFunctionClass{ast.TiDBBoundedStaleness, 2, 2}},
ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}},

// string functions
ast.ASCII: &asciiFunctionClass{baseFunctionClass{ast.ASCII, 1, 1}},
Expand Down Expand Up @@ -881,7 +884,6 @@ var funcs = map[string]functionClass{
// This function is used to show tidb-server version info.
ast.TiDBVersion: &tidbVersionFunctionClass{baseFunctionClass{ast.TiDBVersion, 0, 0}},
ast.TiDBIsDDLOwner: &tidbIsDDLOwnerFunctionClass{baseFunctionClass{ast.TiDBIsDDLOwner, 0, 0}},
ast.TiDBParseTso: &tidbParseTsoFunctionClass{baseFunctionClass{ast.TiDBParseTso, 1, 1}},
ast.TiDBDecodePlan: &tidbDecodePlanFunctionClass{baseFunctionClass{ast.TiDBDecodePlan, 1, 1}},

// TiDB Sequence function.
Expand Down
111 changes: 111 additions & 0 deletions expression/builtin_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -7113,3 +7114,113 @@ func handleInvalidZeroTime(ctx sessionctx.Context, t types.Time) (bool, error) {
}
return true, handleInvalidTimeError(ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, t.String()))
}

// tidbBoundedStalenessFunctionClass reads a time window [a, b] and compares it with the latest SafeTS
// to determine which TS to use in a read only transaction.
type tidbBoundedStalenessFunctionClass struct {
baseFunctionClass
}

func (c *tidbBoundedStalenessFunctionClass) getFunction(ctx sessionctx.Context, args []Expression) (builtinFunc, error) {
if err := c.verifyArgs(args); err != nil {
return nil, err
}
bf, err := newBaseBuiltinFuncWithTp(ctx, c.funcName, args, types.ETInt, types.ETDatetime, types.ETDatetime)
if err != nil {
return nil, err
}
sig := &builtinTiDBBoundedStalenessSig{bf}
return sig, nil
}

type builtinTiDBBoundedStalenessSig struct {
baseBuiltinFunc
}

func (b *builtinTiDBBoundedStalenessSig) Clone() builtinFunc {
newSig := &builtinTidbParseTsoSig{}
newSig.cloneFrom(&b.baseBuiltinFunc)
return newSig
}

func (b *builtinTiDBBoundedStalenessSig) evalInt(row chunk.Row) (int64, bool, error) {
leftTime, isNull, err := b.args[0].EvalTime(b.ctx, row)
if isNull || err != nil {
return 0, true, handleInvalidTimeError(b.ctx, err)
}
rightTime, isNull, err := b.args[1].EvalTime(b.ctx, row)
if isNull || err != nil {
return 0, true, handleInvalidTimeError(b.ctx, err)
}
if invalidLeftTime, invalidRightTime := leftTime.InvalidZero(), rightTime.InvalidZero(); invalidLeftTime || invalidRightTime {
if invalidLeftTime {
err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, leftTime.String()))
}
if invalidRightTime {
err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, rightTime.String()))
}
return 0, true, err
}
minTime, err := leftTime.GoTime(getTimeZone(b.ctx))
if err != nil {
return 0, true, err
}
maxTime, err := rightTime.GoTime(getTimeZone(b.ctx))
if err != nil {
return 0, true, err
}
// Make sure the time is not too big or small to prevent it from overflow later.
if !(checkTimeRange(minTime) && checkTimeRange(maxTime)) {
return 0, true, nil
}
if minTime.After(maxTime) {
return 0, true, nil
}
minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0)
return calAppropriateTS(minTS, maxTS, getMinSafeTS(b.ctx)), false, nil
}

func checkTimeRange(t time.Time) bool {
unixT := t.Unix()
unixTMillisecond := unixT * 1000
// Less than the unix timestamp zero or overflow after * 1000.
if unixT < 0 || unixTMillisecond < 0 {
return false
}
// Overflow after being composed to TS
if oracle.ComposeTS(unixTMillisecond, 0) < uint64(unixTMillisecond) {
return false
}
return true
}

func getMinSafeTS(sessionCtx sessionctx.Context) (minSafeTS uint64) {
if store := sessionCtx.GetStore(); store != nil {
minSafeTS = store.GetMinSafeTS(sessionCtx.GetSessionVars().CheckAndGetTxnScope())
}
// Inject mocked SafeTS for test.
failpoint.Inject("injectSafeTS", func(val failpoint.Value) {
injectTS := val.(int)
minSafeTS = uint64(injectTS)
})
// Try to get from the stmt cache to make sure this function is deterministic.
stmtCtx := sessionCtx.GetSessionVars().StmtCtx
minSafeTS = stmtCtx.GetOrStoreStmtCache(stmtctx.StmtSafeTSCacheKey, minSafeTS).(uint64)
return
}

// For a SafeTS t and a time range [t1, t2]:
// 1. If t < t1, we will use t1 as the result,
// and with it, a read request may fail because it's an unreached SafeTS.
// 2. If t1 <= t <= t2, we will use t as the result, and with it,
// a read request won't fail.
// 2. If t2 < t, we will use t2 as the result,
// and with it, a read request won't fail because it's bigger than the latest SafeTS.
func calAppropriateTS(minTS, maxTS, minSafeTS uint64) int64 {
if minSafeTS < minTS {
return int64(minTS)
} else if minTS <= minSafeTS && minSafeTS <= maxTS {
return int64(minSafeTS)
}
return int64(maxTS)
}
107 changes: 106 additions & 1 deletion expression/builtin_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
package expression

import (
"fmt"
"math"
"strings"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mock"
Expand Down Expand Up @@ -804,7 +807,7 @@ func (s *testEvaluatorSuite) TestTime(c *C) {
}

func resetStmtContext(ctx sessionctx.Context) {
ctx.GetSessionVars().StmtCtx.ResetNowTs()
ctx.GetSessionVars().StmtCtx.ResetStmtCache()
}

func (s *testEvaluatorSuite) TestNowAndUTCTimestamp(c *C) {
Expand Down Expand Up @@ -2854,6 +2857,108 @@ func (s *testEvaluatorSuite) TestTidbParseTso(c *C) {
}
}

func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) {
const timeParserLayout = "2006-01-02 15:04:05.000"
t1, err := time.Parse(timeParserLayout, "2015-09-21 09:53:04.877")
c.Assert(err, IsNil)
t1Str := t1.Format(timeParserLayout)
ts1 := int64(oracle.ComposeTS(t1.Unix()*1000, 0))
t2 := time.Now().UTC()
t2Str := t2.Format(timeParserLayout)
ts2 := int64(oracle.ComposeTS(t2.Unix()*1000, 0))
s.ctx.GetSessionVars().TimeZone = time.UTC
tests := []struct {
leftTime interface{}
rightTime interface{}
injectSafeTS uint64
isNull bool
expect int64
}{
// SafeTS is in the range.
{
leftTime: t1Str,
rightTime: t2Str,
injectSafeTS: func() uint64 {
phy := t2.Add(-1*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
}(),
isNull: false,
expect: func() int64 {
phy := t2.Add(-1*time.Second).Unix() * 1000
return int64(oracle.ComposeTS(phy, 0))
}(),
},
// SafeTS is less than the left time.
{
leftTime: t1Str,
rightTime: t2Str,
injectSafeTS: func() uint64 {
phy := t1.Add(-1*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
}(),
isNull: false,
expect: ts1,
},
// SafeTS is bigger than the right time.
{
leftTime: t1Str,
rightTime: t2Str,
injectSafeTS: func() uint64 {
phy := t2.Add(time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
}(),
isNull: false,
expect: ts2,
},
// Wrong time order.
{
leftTime: t2Str,
rightTime: t1Str,
injectSafeTS: 0,
isNull: true,
expect: 0,
},
}

fc := funcs[ast.TiDBBoundedStaleness]
for _, test := range tests {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", test.injectSafeTS)), IsNil)
f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(test.leftTime), types.NewDatum(test.rightTime)}))
c.Assert(err, IsNil)
d, err := evalBuiltinFunc(f, chunk.Row{})
c.Assert(err, IsNil)
if test.isNull {
c.Assert(d.IsNull(), IsTrue)
} else {
c.Assert(d.GetInt64(), Equals, test.expect)
}
resetStmtContext(s.ctx)
}

// Test whether it's deterministic.
safeTS1 := oracle.ComposeTS(t2.Add(-1*time.Second).Unix()*1000, 0)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", safeTS1)), IsNil)
f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)}))
c.Assert(err, IsNil)
d, err := evalBuiltinFunc(f, chunk.Row{})
c.Assert(err, IsNil)
c.Assert(d.GetInt64(), Equals, int64(safeTS1))
// SafeTS updated.
safeTS2 := oracle.ComposeTS(t2.Add(1*time.Second).Unix()*1000, 0)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", safeTS2)), IsNil)
f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)}))
c.Assert(err, IsNil)
d, err = evalBuiltinFunc(f, chunk.Row{})
c.Assert(err, IsNil)
// Still safeTS1
c.Assert(d.GetInt64(), Equals, int64(safeTS1))
resetStmtContext(s.ctx)
failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")
}

func (s *testEvaluatorSuite) TestGetIntervalFromDecimal(c *C) {
du := baseDateArithmitical{}

Expand Down
67 changes: 67 additions & 0 deletions expression/builtin_time_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,73 @@ func (b *builtinTidbParseTsoSig) vecEvalTime(input *chunk.Chunk, result *chunk.C
return nil
}

func (b *builtinTiDBBoundedStalenessSig) vectorized() bool {
return true
}

func (b *builtinTiDBBoundedStalenessSig) vecEvalInt(input *chunk.Chunk, result *chunk.Column) error {
n := input.NumRows()
buf0, err := b.bufAllocator.get(types.ETDatetime, n)
if err != nil {
return err
}
defer b.bufAllocator.put(buf0)
if err = b.args[0].VecEvalTime(b.ctx, input, buf0); err != nil {
return err
}
buf1, err := b.bufAllocator.get(types.ETDatetime, n)
if err != nil {
return err
}
defer b.bufAllocator.put(buf1)
if err = b.args[1].VecEvalTime(b.ctx, input, buf1); err != nil {
return err
}
result.ResizeInt64(n, false)
result.MergeNulls(buf0, buf1)
i64s := result.Int64s()
args0 := buf0.Times()
args1 := buf1.Times()
minSafeTS := getMinSafeTS(b.ctx)
for i := 0; i < n; i++ {
if result.IsNull(i) {
continue
}
if invalidArg0, invalidArg1 := args0[i].InvalidZero(), args1[i].InvalidZero(); invalidArg0 || invalidArg1 {
if invalidArg0 {
err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args0[i].String()))
}
if invalidArg1 {
err = handleInvalidTimeError(b.ctx, types.ErrWrongValue.GenWithStackByArgs(types.DateTimeStr, args1[i].String()))
}
if err != nil {
return err
}
result.SetNull(i, true)
continue
}
minTime, err := args0[i].GoTime(getTimeZone(b.ctx))
if err != nil {
return err
}
maxTime, err := args1[i].GoTime(getTimeZone(b.ctx))
if err != nil {
return err
}
if !(checkTimeRange(minTime) && checkTimeRange(maxTime)) {
result.SetNull(i, true)
continue
}
if minTime.After(maxTime) {
result.SetNull(i, true)
continue
}
minTS, maxTS := oracle.ComposeTS(minTime.Unix()*1000, 0), oracle.ComposeTS(maxTime.Unix()*1000, 0)
i64s[i] = calAppropriateTS(minTS, maxTS, minSafeTS)
}
return nil
}

func (b *builtinFromDaysSig) vectorized() bool {
return true
}
Expand Down
7 changes: 7 additions & 0 deletions expression/builtin_time_vec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,13 @@ var vecBuiltinTimeCases = map[string][]vecExprBenchCase{
geners: []dataGenerator{newRangeInt64Gener(0, math.MaxInt64)},
},
},
// Todo: how to inject the safeTS for better testing.
ast.TiDBBoundedStaleness: {
{
retEvalType: types.ETInt,
childrenTypes: []types.EvalType{types.ETDatetime, types.ETDatetime},
},
},
ast.LastDay: {
{retEvalType: types.ETDatetime, childrenTypes: []types.EvalType{types.ETDatetime}},
},
Expand Down
3 changes: 2 additions & 1 deletion expression/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
Expand Down Expand Up @@ -155,5 +156,5 @@ func getStmtTimestamp(ctx sessionctx.Context) (time.Time, error) {
return time.Unix(timestamp, 0), nil
}
stmtCtx := ctx.GetSessionVars().StmtCtx
return stmtCtx.GetNowTsCached(), nil
return stmtCtx.GetOrStoreStmtCache(stmtctx.StmtNowTsCacheKey, time.Now()).(time.Time), nil
}
Loading