Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

oracle: simplify timestamp utilities #24688

Merged
merged 14 commits into from
May 19, 2021
4 changes: 2 additions & 2 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,9 +1218,9 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo
failpoint.Inject("mockUpdateCachedSafePoint", func(val failpoint.Value) {
if val.(bool) {
// 18 is for the logical time.
disksing marked this conversation as resolved.
Show resolved Hide resolved
ts := oracle.GetPhysical(time.Now()) << 18
ts := oracle.GoTimeToTS(time.Now())
s := reorg.d.store.(tikv.Storage)
s.UpdateSPCache(uint64(ts), time.Now())
s.UpdateSPCache(ts, time.Now())
time.Sleep(time.Millisecond * 3)
}
})
Expand Down
4 changes: 2 additions & 2 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (*testSuite) TestT(c *C) {
c.Assert(dd, NotNil)
c.Assert(dd.GetLease(), Equals, 80*time.Millisecond)

snapTS := oracle.EncodeTSO(oracle.GetPhysical(time.Now()))
snapTS := oracle.GoTimeToTS(time.Now())
cs := &ast.CharsetOpt{
Chs: "utf8",
Col: "utf8_bin",
Expand Down Expand Up @@ -317,7 +317,7 @@ func (*testSuite) TestT(c *C) {
c.Assert(err, IsNil)

// for GetSnapshotInfoSchema
currSnapTS := oracle.EncodeTSO(oracle.GetPhysical(time.Now()))
currSnapTS := oracle.GoTimeToTS(time.Now())
currSnapIs, err := dom.GetSnapshotInfoSchema(currSnapTS)
c.Assert(err, IsNil)
c.Assert(currSnapIs, NotNil)
Expand Down
2 changes: 1 addition & 1 deletion domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
return
}
now := time.Unix(0, oracle.ExtractPhysical(currentVer.Ver)*1e6)
now := oracle.GetTimeFromTS(currentVer.Ver)
startTSLowerLimit := oracle.GoTimeToLowerLimitStartTS(now, tikv.MaxTxnTimeUse)

minStartTS := oracle.GoTimeToTS(now)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2728,7 +2728,7 @@ func (s *testSuiteP2) TestHistoryRead(c *C) {
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2 <nil>", "4 <nil>", "8 8", "9 9"))
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2", "4"))
tsoStr := strconv.FormatUint(oracle.EncodeTSO(snapshotTime.UnixNano()/int64(time.Millisecond)), 10)
tsoStr := strconv.FormatUint(oracle.GoTimeToTS(snapshotTime), 10)

tk.MustExec("set @@tidb_snapshot = '" + tsoStr + "'")
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2", "4"))
Expand Down
3 changes: 1 addition & 2 deletions executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -203,7 +202,7 @@ func (e *ShowExec) histogramToRow(dbName, tblName, partitionName, colName string
}

func (e *ShowExec) versionToTime(version uint64) types.Time {
t := time.Unix(0, oracle.ExtractPhysical(version)*int64(time.Millisecond))
t := oracle.GetTimeFromTS(version)
return types.NewTime(types.FromGoTime(t), mysql.TypeDatetime, 0)
}

Expand Down
4 changes: 2 additions & 2 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.ComposeTS(gt.Unix()*1000, 0)
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
case ast.TimestampBoundExactStaleness:
// TODO: support funcCallExpr in future
Expand Down Expand Up @@ -667,7 +667,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.ComposeTS(gt.Unix()*1000, 0)
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
}
err := e.ctx.NewTxnWithStalenessOption(ctx, opt)
Expand Down
12 changes: 4 additions & 8 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,15 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`,
injectSafeTS: func() uint64 {
phy := time.Now().Add(-10*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second))
}(),
useSafeTS: true,
},
{
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`,
injectSafeTS: func() uint64 {
phy := time.Now().Add(-20*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second))
}(),
useSafeTS: false,
},
Expand All @@ -286,8 +284,7 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: func() uint64 {
phy := time.Now().Add(-10*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second))
}(),
useSafeTS: true,
},
Expand All @@ -298,8 +295,7 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: func() uint64 {
phy := time.Now().Add(-20*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second))
}(),
useSafeTS: false,
},
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,7 @@ func (s *testStatsSuite) TestNeedAnalyzeTable(c *C) {
}{
// table was never analyzed and has reach the limit
{
tbl: &statistics.Table{Version: oracle.EncodeTSO(oracle.GetPhysical(time.Now()))},
tbl: &statistics.Table{Version: oracle.GoTimeToTS(time.Now())},
limit: 0,
ratio: 0,
start: "00:00 +0800",
Expand All @@ -1464,7 +1464,7 @@ func (s *testStatsSuite) TestNeedAnalyzeTable(c *C) {
},
// table was never analyzed but has not reach the limit
{
tbl: &statistics.Table{Version: oracle.EncodeTSO(oracle.GetPhysical(time.Now()))},
tbl: &statistics.Table{Version: oracle.GoTimeToTS(time.Now())},
limit: time.Hour,
ratio: 0,
start: "00:00 +0800",
Expand Down
6 changes: 3 additions & 3 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) {
strconv.FormatUint(now, 10))
c.Assert(err, IsNil)
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"),
strconv.FormatUint(now-oracle.EncodeTSO(20000), 10))
strconv.FormatUint(now-oracle.ComposeTS(20000, 0), 10))
c.Assert(err, IsNil)
sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.EncodeTSO(10000))
c.Assert(sp, Equals, now-oracle.EncodeTSO(20000)-1)
sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.ComposeTS(10000, 0))
c.Assert(sp, Equals, now-oracle.ComposeTS(20000, 0)-1)
}

func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64

func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error {
// Amend txn with current time first, then we can make sure we have another SafeWindow time to commit
currentTS := oracle.EncodeTSO(int64(time.Since(c.txn.startTime)/time.Millisecond)) + c.startTS
currentTS := oracle.ComposeTS(int64(time.Since(c.txn.startTime)/time.Millisecond), 0) + c.startTS
_, _, err := c.checkSchemaValid(ctx, currentTS, c.txn.schemaVer, true)
if err != nil {
logutil.Logger(ctx).Info("Schema changed for async commit txn",
Expand All @@ -1442,7 +1442,7 @@ func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error {
}

safeWindow := config.GetGlobalConfig().TiKVClient.AsyncCommit.SafeWindow
maxCommitTS := oracle.EncodeTSO(int64(safeWindow/time.Millisecond)) + currentTS
maxCommitTS := oracle.ComposeTS(int64(safeWindow/time.Millisecond), 0) + currentTS
logutil.BgLogger().Debug("calculate MaxCommitTS",
zap.Time("startTime", c.txn.startTime),
zap.Duration("safeWindow", safeWindow),
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/latch/latch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) {
func (s *testLatchSuite) TestRecycle(c *C) {
latches := NewLatches(8)
now := time.Now()
startTS := oracle.ComposeTS(oracle.GetPhysical(now), 0)
startTS := oracle.GoTimeToTS(now)
lock := latches.genLock(startTS, [][]byte{
[]byte("a"), []byte("b"),
})
Expand Down Expand Up @@ -142,7 +142,7 @@ func (s *testLatchSuite) TestRecycle(c *C) {
}
c.Assert(allEmpty, IsFalse)

currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(expireDuration)), 3)
currentTS := oracle.GoTimeToTS(now.Add(expireDuration)) + 3
latches.recycle(currentTS)

for i := 0; i < len(latches.slots); i++ {
Expand Down
21 changes: 0 additions & 21 deletions store/tikv/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ import (
"context"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/logutil"
"go.uber.org/zap"
)

// Option represents available options for the oracle.Oracle.
Expand Down Expand Up @@ -100,19 +97,6 @@ const (

// ComposeTS creates a ts from physical and logical parts.
func ComposeTS(physical, logical int64) uint64 {
failpoint.Inject("changeTSFromPD", func(val failpoint.Value) {
valInt, ok := val.(int)
if ok {
origPhyTS := physical
logical := logical
newPhyTs := origPhyTS + int64(valInt)
origTS := uint64((physical << physicalShiftBits) + logical)
newTS := uint64((newPhyTs << physicalShiftBits) + logical)
logutil.BgLogger().Warn("ComposeTS failpoint", zap.Uint64("origTS", origTS),
zap.Int("valInt", valInt), zap.Uint64("ts", newTS))
failpoint.Return(newTS)
}
})
return uint64((physical << physicalShiftBits) + logical)
}

Expand All @@ -131,11 +115,6 @@ func GetPhysical(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}

// EncodeTSO encodes a millisecond into tso.
func EncodeTSO(ts int64) uint64 {
return uint64(ts) << physicalShiftBits
}

// GetTimeFromTS extracts time.Time from a timestamp.
func GetTimeFromTS(ts uint64) time.Time {
ms := ExtractPhysical(ts)
Expand Down
10 changes: 4 additions & 6 deletions store/tikv/oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (l *localOracle) IsExpired(lockTS, TTL uint64, _ *oracle.Option) bool {
if l.hook != nil {
now = l.hook.currentTime
}
return oracle.GetPhysical(now) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
expire := oracle.GetTimeFromTS(lockTS).Add(time.Duration(TTL) * time.Millisecond)
return !now.Before(expire)
}

func (l *localOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64, error) {
Expand All @@ -52,8 +53,7 @@ func (l *localOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint6
if l.hook != nil {
now = l.hook.currentTime
}
physical := oracle.GetPhysical(now)
ts := oracle.ComposeTS(physical, 0)
ts := oracle.GoTimeToTS(now)
if l.lastTimeStampTS == ts {
l.n++
return ts + l.n, nil
Expand All @@ -80,9 +80,7 @@ func (l *localOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *o

// GetStaleTimestamp return physical
func (l *localOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
physical := oracle.GetPhysical(time.Now().Add(-time.Second * time.Duration(prevSecond)))
ts = oracle.ComposeTS(physical, 0)
return ts, nil
return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil
}

type future struct {
Expand Down
16 changes: 7 additions & 9 deletions store/tikv/oracle/oracles/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ func (o *MockOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64
if o.stop {
return 0, errors.Trace(errStopped)
}
physical := oracle.GetPhysical(time.Now().Add(o.offset))
ts := oracle.ComposeTS(physical, 0)
if oracle.ExtractPhysical(o.lastTS) == physical {
ts := oracle.GoTimeToTS(time.Now().Add(o.offset))
if oracle.ExtractPhysical(o.lastTS) == oracle.ExtractPhysical(ts) {
ts = o.lastTS + 1
}
o.lastTS = ts
Expand All @@ -73,9 +72,7 @@ func (o *MockOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64

// GetStaleTimestamp implements oracle.Oracle interface.
func (o *MockOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
physical := oracle.GetPhysical(time.Now().Add(-time.Second * time.Duration(prevSecond)))
ts = oracle.ComposeTS(physical, 0)
return ts, nil
return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil
}

type mockOracleFuture struct {
Expand Down Expand Up @@ -106,15 +103,16 @@ func (o *MockOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *or
func (o *MockOracle) IsExpired(lockTimestamp, TTL uint64, _ *oracle.Option) bool {
o.RLock()
defer o.RUnlock()

return oracle.GetPhysical(time.Now().Add(o.offset)) >= oracle.ExtractPhysical(lockTimestamp)+int64(TTL)
expire := oracle.GetTimeFromTS(lockTimestamp).Add(time.Duration(TTL) * time.Millisecond)
return !time.Now().Add(o.offset).Before(expire)
}

// UntilExpired implement oracle.Oracle interface.
func (o *MockOracle) UntilExpired(lockTimeStamp, TTL uint64, _ *oracle.Option) int64 {
o.RLock()
defer o.RUnlock()
return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(time.Now().Add(o.offset))
expire := oracle.GetTimeFromTS(lockTimeStamp).Add(time.Duration(TTL) * time.Millisecond)
return expire.Sub(time.Now().Add(o.offset)).Milliseconds()
}

// Close implements oracle.Oracle interface.
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, e
}

func (o *pdOracle) getArrivalTimestamp() uint64 {
return oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0)
return oracle.GoTimeToTS(time.Now())
}

func (o *pdOracle) setLastTS(ts uint64, txnScope string) {
Expand Down Expand Up @@ -288,7 +288,7 @@ func (o *pdOracle) getStaleTimestamp(txnScope string, prevSecond uint64) (uint64

staleTime := physicalTime.Add(-arrivalTime.Sub(time.Now().Add(-time.Duration(prevSecond) * time.Second)))

return oracle.ComposeTS(oracle.GetPhysical(staleTime), 0), nil
return oracle.GoTimeToTS(staleTime), nil
}

// GetStaleTimestamp generate a TSO which represents for the TSO prevSecond secs ago.
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/oracle/oracles/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ func (s *testPDSuite) TestPDOracle_UntilExpired(c *C) {
lockAfter, lockExp := 10, 15
o := oracles.NewEmptyPDOracle()
start := time.Now()
oracles.SetEmptyPDOracleLastTs(o, oracle.ComposeTS(oracle.GetPhysical(start), 0))
lockTs := oracle.ComposeTS(oracle.GetPhysical(start.Add(time.Duration(lockAfter)*time.Millisecond)), 1)
oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start))
lockTs := oracle.GoTimeToTS((start.Add(time.Duration(lockAfter) * time.Millisecond))) + 1
waitTs := o.UntilExpired(lockTs, uint64(lockExp), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
c.Assert(waitTs, Equals, int64(lockAfter+lockExp), Commentf("waitTs shoulb be %d but got %d", int64(lockAfter+lockExp), waitTs))
}

func (s *testPDSuite) TestPdOracle_GetStaleTimestamp(c *C) {
o := oracles.NewEmptyPDOracle()
start := time.Now()
oracles.SetEmptyPDOracleLastTs(o, oracle.ComposeTS(oracle.GetPhysical(start), 0))
oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start))
ts, err := o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 10)
c.Assert(err, IsNil)

Expand Down Expand Up @@ -75,7 +75,7 @@ func (s *testPDSuite) TestPdOracle_GetStaleTimestamp(c *C) {
for _, testcase := range testcases {
comment := Commentf("%s", testcase.name)
start = time.Now()
oracles.SetEmptyPDOracleLastTs(o, oracle.ComposeTS(oracle.GetPhysical(start), 0))
oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start))
ts, err = o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, testcase.preSec)
if testcase.expectErr == "" {
c.Assert(err, IsNil, comment)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func (s *testCommitterSuite) TestPessimisticLockReturnValues(c *C) {
func (s *testCommitterSuite) TestElapsedTTL(c *C) {
key := []byte("key")
txn := s.begin(c)
txn.SetStartTS(oracle.ComposeTS(oracle.GetPhysical(time.Now().Add(time.Second*10)), 1))
txn.SetStartTS(oracle.GoTimeToTS(time.Now().Add(time.Second*10)) + 1)
txn.SetPessimistic(true)
time.Sleep(time.Millisecond * 100)
lockCtx := &kv.LockCtx{
Expand Down
18 changes: 0 additions & 18 deletions store/tikv/tests/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -154,20 +153,3 @@ func (s *testStoreSuite) TestRequestPriority(c *C) {
}
iter.Close()
}

func (s *testStoreSerialSuite) TestOracleChangeByFailpoint(c *C) {
defer func() {
failpoint.Disable("github.com/pingcap/tidb/store/tikv/oracle/changeTSFromPD")
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/oracle/changeTSFromPD",
"return(10000)"), IsNil)
o := &oracles.MockOracle{}
s.store.SetOracle(o)
ctx := context.Background()
t1, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope)
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/oracle/changeTSFromPD"), IsNil)
t2, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope)
c.Assert(err, IsNil)
c.Assert(t1, Greater, t2)
}