Skip to content

Commit

Permalink
oracle: simplify timestamp utilities (#24688)
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored May 19, 2021
1 parent d1a5fa3 commit a2278df
Show file tree
Hide file tree
Showing 18 changed files with 40 additions and 90 deletions.
5 changes: 2 additions & 3 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,10 +1217,9 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo

failpoint.Inject("mockUpdateCachedSafePoint", func(val failpoint.Value) {
if val.(bool) {
// 18 is for the logical time.
ts := oracle.GetPhysical(time.Now()) << 18
ts := oracle.GoTimeToTS(time.Now())
s := reorg.d.store.(tikv.Storage)
s.UpdateSPCache(uint64(ts), time.Now())
s.UpdateSPCache(ts, time.Now())
time.Sleep(time.Millisecond * 3)
}
})
Expand Down
4 changes: 2 additions & 2 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (*testSuite) TestT(c *C) {
c.Assert(dd, NotNil)
c.Assert(dd.GetLease(), Equals, 80*time.Millisecond)

snapTS := oracle.EncodeTSO(oracle.GetPhysical(time.Now()))
snapTS := oracle.GoTimeToTS(time.Now())
cs := &ast.CharsetOpt{
Chs: "utf8",
Col: "utf8_bin",
Expand Down Expand Up @@ -317,7 +317,7 @@ func (*testSuite) TestT(c *C) {
c.Assert(err, IsNil)

// for GetSnapshotInfoSchema
currSnapTS := oracle.EncodeTSO(oracle.GetPhysical(time.Now()))
currSnapTS := oracle.GoTimeToTS(time.Now())
currSnapIs, err := dom.GetSnapshotInfoSchema(currSnapTS)
c.Assert(err, IsNil)
c.Assert(currSnapIs, NotNil)
Expand Down
2 changes: 1 addition & 1 deletion domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
logutil.BgLogger().Error("update minStartTS failed", zap.Error(err))
return
}
now := time.Unix(0, oracle.ExtractPhysical(currentVer.Ver)*1e6)
now := oracle.GetTimeFromTS(currentVer.Ver)
startTSLowerLimit := oracle.GoTimeToLowerLimitStartTS(now, tikv.MaxTxnTimeUse)

minStartTS := oracle.GoTimeToTS(now)
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2727,7 +2727,7 @@ func (s *testSuiteP2) TestHistoryRead(c *C) {
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2 <nil>", "4 <nil>", "8 8", "9 9"))
tk.MustExec("set @@tidb_snapshot = '" + snapshotTime.Format("2006-01-02 15:04:05.999999") + "'")
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2", "4"))
tsoStr := strconv.FormatUint(oracle.EncodeTSO(snapshotTime.UnixNano()/int64(time.Millisecond)), 10)
tsoStr := strconv.FormatUint(oracle.GoTimeToTS(snapshotTime), 10)

tk.MustExec("set @@tidb_snapshot = '" + tsoStr + "'")
tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2", "4"))
Expand Down
3 changes: 1 addition & 2 deletions executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -203,7 +202,7 @@ func (e *ShowExec) histogramToRow(dbName, tblName, partitionName, colName string
}

func (e *ShowExec) versionToTime(version uint64) types.Time {
t := time.Unix(0, oracle.ExtractPhysical(version)*int64(time.Millisecond))
t := oracle.GetTimeFromTS(version)
return types.NewTime(types.FromGoTime(t), mysql.TypeDatetime, 0)
}

Expand Down
4 changes: 2 additions & 2 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.ComposeTS(gt.Unix()*1000, 0)
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
case ast.TimestampBoundExactStaleness:
// TODO: support funcCallExpr in future
Expand Down Expand Up @@ -668,7 +668,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.ComposeTS(gt.Unix()*1000, 0)
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
}
err := e.ctx.NewTxnWithStalenessOption(ctx, opt)
Expand Down
12 changes: 4 additions & 8 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,15 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`,
injectSafeTS: func() uint64 {
phy := time.Now().Add(-10*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second))
}(),
useSafeTS: true,
},
{
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`,
injectSafeTS: func() uint64 {
phy := time.Now().Add(-20*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second))
}(),
useSafeTS: false,
},
Expand All @@ -217,8 +215,7 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: func() uint64 {
phy := time.Now().Add(-10*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second))
}(),
useSafeTS: true,
},
Expand All @@ -229,8 +226,7 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: func() uint64 {
phy := time.Now().Add(-20*time.Second).Unix() * 1000
return oracle.ComposeTS(phy, 0)
return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second))
}(),
useSafeTS: false,
},
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,7 +1453,7 @@ func (s *testStatsSuite) TestNeedAnalyzeTable(c *C) {
}{
// table was never analyzed and has reach the limit
{
tbl: &statistics.Table{Version: oracle.EncodeTSO(oracle.GetPhysical(time.Now()))},
tbl: &statistics.Table{Version: oracle.GoTimeToTS(time.Now())},
limit: 0,
ratio: 0,
start: "00:00 +0800",
Expand All @@ -1464,7 +1464,7 @@ func (s *testStatsSuite) TestNeedAnalyzeTable(c *C) {
},
// table was never analyzed but has not reach the limit
{
tbl: &statistics.Table{Version: oracle.EncodeTSO(oracle.GetPhysical(time.Now()))},
tbl: &statistics.Table{Version: oracle.GoTimeToTS(time.Now())},
limit: time.Hour,
ratio: 0,
start: "00:00 +0800",
Expand Down
6 changes: 3 additions & 3 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ func (s *testGCWorkerSuite) TestMinStartTS(c *C) {
strconv.FormatUint(now, 10))
c.Assert(err, IsNil)
err = spkv.Put(fmt.Sprintf("%s/%s", infosync.ServerMinStartTSPath, "b"),
strconv.FormatUint(now-oracle.EncodeTSO(20000), 10))
strconv.FormatUint(now-oracle.ComposeTS(20000, 0), 10))
c.Assert(err, IsNil)
sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.EncodeTSO(10000))
c.Assert(sp, Equals, now-oracle.EncodeTSO(20000)-1)
sp = s.gcWorker.calcSafePointByMinStartTS(ctx, now-oracle.ComposeTS(10000, 0))
c.Assert(sp, Equals, now-oracle.ComposeTS(20000, 0)-1)
}

func (s *testGCWorkerSuite) TestPrepareGC(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,7 +1428,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64

func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error {
// Amend txn with current time first, then we can make sure we have another SafeWindow time to commit
currentTS := oracle.EncodeTSO(int64(time.Since(c.txn.startTime)/time.Millisecond)) + c.startTS
currentTS := oracle.ComposeTS(int64(time.Since(c.txn.startTime)/time.Millisecond), 0) + c.startTS
_, _, err := c.checkSchemaValid(ctx, currentTS, c.txn.schemaVer, true)
if err != nil {
logutil.Logger(ctx).Info("Schema changed for async commit txn",
Expand All @@ -1438,7 +1438,7 @@ func (c *twoPhaseCommitter) calculateMaxCommitTS(ctx context.Context) error {
}

safeWindow := config.GetGlobalConfig().TiKVClient.AsyncCommit.SafeWindow
maxCommitTS := oracle.EncodeTSO(int64(safeWindow/time.Millisecond)) + currentTS
maxCommitTS := oracle.ComposeTS(int64(safeWindow/time.Millisecond), 0) + currentTS
logutil.BgLogger().Debug("calculate MaxCommitTS",
zap.Time("startTime", c.txn.startTime),
zap.Duration("safeWindow", safeWindow),
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/latch/latch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *testLatchSuite) TestFirstAcquireFailedWithStale(c *C) {
func (s *testLatchSuite) TestRecycle(c *C) {
latches := NewLatches(8)
now := time.Now()
startTS := oracle.ComposeTS(oracle.GetPhysical(now), 0)
startTS := oracle.GoTimeToTS(now)
lock := latches.genLock(startTS, [][]byte{
[]byte("a"), []byte("b"),
})
Expand Down Expand Up @@ -142,7 +142,7 @@ func (s *testLatchSuite) TestRecycle(c *C) {
}
c.Assert(allEmpty, IsFalse)

currentTS := oracle.ComposeTS(oracle.GetPhysical(now.Add(expireDuration)), 3)
currentTS := oracle.GoTimeToTS(now.Add(expireDuration)) + 3
latches.recycle(currentTS)

for i := 0; i < len(latches.slots); i++ {
Expand Down
22 changes: 0 additions & 22 deletions store/tikv/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ package oracle
import (
"context"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/store/tikv/logutil"
"go.uber.org/zap"
)

// Option represents available options for the oracle.Oracle.
Expand Down Expand Up @@ -53,19 +49,6 @@ const (

// ComposeTS creates a ts from physical and logical parts.
func ComposeTS(physical, logical int64) uint64 {
failpoint.Inject("changeTSFromPD", func(val failpoint.Value) {
valInt, ok := val.(int)
if ok {
origPhyTS := physical
logical := logical
newPhyTs := origPhyTS + int64(valInt)
origTS := uint64((physical << physicalShiftBits) + logical)
newTS := uint64((newPhyTs << physicalShiftBits) + logical)
logutil.BgLogger().Warn("ComposeTS failpoint", zap.Uint64("origTS", origTS),
zap.Int("valInt", valInt), zap.Uint64("ts", newTS))
failpoint.Return(newTS)
}
})
return uint64((physical << physicalShiftBits) + logical)
}

Expand All @@ -84,11 +67,6 @@ func GetPhysical(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}

// EncodeTSO encodes a millisecond into tso.
func EncodeTSO(ts int64) uint64 {
return uint64(ts) << physicalShiftBits
}

// GetTimeFromTS extracts time.Time from a timestamp.
func GetTimeFromTS(ts uint64) time.Time {
ms := ExtractPhysical(ts)
Expand Down
10 changes: 4 additions & 6 deletions store/tikv/oracle/oracles/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (l *localOracle) IsExpired(lockTS, TTL uint64, _ *oracle.Option) bool {
if l.hook != nil {
now = l.hook.currentTime
}
return oracle.GetPhysical(now) >= oracle.ExtractPhysical(lockTS)+int64(TTL)
expire := oracle.GetTimeFromTS(lockTS).Add(time.Duration(TTL) * time.Millisecond)
return !now.Before(expire)
}

func (l *localOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64, error) {
Expand All @@ -52,8 +53,7 @@ func (l *localOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint6
if l.hook != nil {
now = l.hook.currentTime
}
physical := oracle.GetPhysical(now)
ts := oracle.ComposeTS(physical, 0)
ts := oracle.GoTimeToTS(now)
if l.lastTimeStampTS == ts {
l.n++
return ts + l.n, nil
Expand All @@ -80,9 +80,7 @@ func (l *localOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *o

// GetStaleTimestamp return physical
func (l *localOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
physical := oracle.GetPhysical(time.Now().Add(-time.Second * time.Duration(prevSecond)))
ts = oracle.ComposeTS(physical, 0)
return ts, nil
return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil
}

type future struct {
Expand Down
16 changes: 7 additions & 9 deletions store/tikv/oracle/oracles/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@ func (o *MockOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64
if o.stop {
return 0, errors.Trace(errStopped)
}
physical := oracle.GetPhysical(time.Now().Add(o.offset))
ts := oracle.ComposeTS(physical, 0)
if oracle.ExtractPhysical(o.lastTS) == physical {
ts := oracle.GoTimeToTS(time.Now().Add(o.offset))
if oracle.ExtractPhysical(o.lastTS) == oracle.ExtractPhysical(ts) {
ts = o.lastTS + 1
}
o.lastTS = ts
Expand All @@ -73,9 +72,7 @@ func (o *MockOracle) GetTimestamp(ctx context.Context, _ *oracle.Option) (uint64

// GetStaleTimestamp implements oracle.Oracle interface.
func (o *MockOracle) GetStaleTimestamp(ctx context.Context, txnScope string, prevSecond uint64) (ts uint64, err error) {
physical := oracle.GetPhysical(time.Now().Add(-time.Second * time.Duration(prevSecond)))
ts = oracle.ComposeTS(physical, 0)
return ts, nil
return oracle.GoTimeToTS(time.Now().Add(-time.Second * time.Duration(prevSecond))), nil
}

type mockOracleFuture struct {
Expand Down Expand Up @@ -106,15 +103,16 @@ func (o *MockOracle) GetLowResolutionTimestampAsync(ctx context.Context, opt *or
func (o *MockOracle) IsExpired(lockTimestamp, TTL uint64, _ *oracle.Option) bool {
o.RLock()
defer o.RUnlock()

return oracle.GetPhysical(time.Now().Add(o.offset)) >= oracle.ExtractPhysical(lockTimestamp)+int64(TTL)
expire := oracle.GetTimeFromTS(lockTimestamp).Add(time.Duration(TTL) * time.Millisecond)
return !time.Now().Add(o.offset).Before(expire)
}

// UntilExpired implement oracle.Oracle interface.
func (o *MockOracle) UntilExpired(lockTimeStamp, TTL uint64, _ *oracle.Option) int64 {
o.RLock()
defer o.RUnlock()
return oracle.ExtractPhysical(lockTimeStamp) + int64(TTL) - oracle.GetPhysical(time.Now().Add(o.offset))
expire := oracle.GetTimeFromTS(lockTimeStamp).Add(time.Duration(TTL) * time.Millisecond)
return expire.Sub(time.Now().Add(o.offset)).Milliseconds()
}

// Close implements oracle.Oracle interface.
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/oracle/oracles/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (o *pdOracle) getTimestamp(ctx context.Context, txnScope string) (uint64, e
}

func (o *pdOracle) getArrivalTimestamp() uint64 {
return oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0)
return oracle.GoTimeToTS(time.Now())
}

func (o *pdOracle) setLastTS(ts uint64, txnScope string) {
Expand Down Expand Up @@ -288,7 +288,7 @@ func (o *pdOracle) getStaleTimestamp(txnScope string, prevSecond uint64) (uint64

staleTime := physicalTime.Add(-arrivalTime.Sub(time.Now().Add(-time.Duration(prevSecond) * time.Second)))

return oracle.ComposeTS(oracle.GetPhysical(staleTime), 0), nil
return oracle.GoTimeToTS(staleTime), nil
}

// GetStaleTimestamp generate a TSO which represents for the TSO prevSecond secs ago.
Expand Down
8 changes: 4 additions & 4 deletions store/tikv/oracle/oracles/pd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ func (s *testPDSuite) TestPDOracle_UntilExpired(c *C) {
lockAfter, lockExp := 10, 15
o := oracles.NewEmptyPDOracle()
start := time.Now()
oracles.SetEmptyPDOracleLastTs(o, oracle.ComposeTS(oracle.GetPhysical(start), 0))
lockTs := oracle.ComposeTS(oracle.GetPhysical(start.Add(time.Duration(lockAfter)*time.Millisecond)), 1)
oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start))
lockTs := oracle.GoTimeToTS((start.Add(time.Duration(lockAfter) * time.Millisecond))) + 1
waitTs := o.UntilExpired(lockTs, uint64(lockExp), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
c.Assert(waitTs, Equals, int64(lockAfter+lockExp), Commentf("waitTs shoulb be %d but got %d", int64(lockAfter+lockExp), waitTs))
}

func (s *testPDSuite) TestPdOracle_GetStaleTimestamp(c *C) {
o := oracles.NewEmptyPDOracle()
start := time.Now()
oracles.SetEmptyPDOracleLastTs(o, oracle.ComposeTS(oracle.GetPhysical(start), 0))
oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start))
ts, err := o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, 10)
c.Assert(err, IsNil)

Expand Down Expand Up @@ -75,7 +75,7 @@ func (s *testPDSuite) TestPdOracle_GetStaleTimestamp(c *C) {
for _, testcase := range testcases {
comment := Commentf("%s", testcase.name)
start = time.Now()
oracles.SetEmptyPDOracleLastTs(o, oracle.ComposeTS(oracle.GetPhysical(start), 0))
oracles.SetEmptyPDOracleLastTs(o, oracle.GoTimeToTS(start))
ts, err = o.GetStaleTimestamp(context.Background(), oracle.GlobalTxnScope, testcase.preSec)
if testcase.expectErr == "" {
c.Assert(err, IsNil, comment)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/tests/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ func (s *testCommitterSuite) TestPessimisticLockReturnValues(c *C) {
func (s *testCommitterSuite) TestElapsedTTL(c *C) {
key := []byte("key")
txn := s.begin(c)
txn.SetStartTS(oracle.ComposeTS(oracle.GetPhysical(time.Now().Add(time.Second*10)), 1))
txn.SetStartTS(oracle.GoTimeToTS(time.Now().Add(time.Second*10)) + 1)
txn.SetPessimistic(true)
time.Sleep(time.Millisecond * 100)
lockCtx := &kv.LockCtx{
Expand Down
18 changes: 0 additions & 18 deletions store/tikv/tests/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -154,20 +153,3 @@ func (s *testStoreSuite) TestRequestPriority(c *C) {
}
iter.Close()
}

func (s *testStoreSerialSuite) TestOracleChangeByFailpoint(c *C) {
defer func() {
failpoint.Disable("github.com/pingcap/tidb/store/tikv/oracle/changeTSFromPD")
}()
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/oracle/changeTSFromPD",
"return(10000)"), IsNil)
o := &oracles.MockOracle{}
s.store.SetOracle(o)
ctx := context.Background()
t1, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope)
c.Assert(err, IsNil)
c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/oracle/changeTSFromPD"), IsNil)
t2, err := s.store.GetTimestampWithRetry(tikv.NewBackofferWithVars(ctx, 100, nil), oracle.GlobalTxnScope)
c.Assert(err, IsNil)
c.Assert(t1, Greater, t2)
}

0 comments on commit a2278df

Please sign in to comment.