From 368e8a95748d422124768d3f1727b2a867fa215a Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Mon, 13 May 2019 12:25:30 +0800 Subject: [PATCH 1/4] Add tidb_low_resolution_tso session scope variable Low resolution timestamp is updated once every two seconds which can be used if reading slightly old version of data is acceptable. Like tidb_snapshot read, transaction is readonly when tidb_low_resolution_tso is enabled. Signed-off-by: Xiaoguang Sun --- executor/adapter.go | 4 ++++ executor/executor_test.go | 22 ++++++++++++++++++++++ session/txn.go | 17 +++++++++++++---- sessionctx/variable/session.go | 6 ++++++ sessionctx/variable/sysvar.go | 1 + sessionctx/variable/sysvar_test.go | 2 ++ sessionctx/variable/tidb_vars.go | 3 +++ sessionctx/variable/varsutil.go | 2 +- sessionctx/variable/varsutil_test.go | 11 +++++++++++ store/mockoracle/oracle.go | 10 ++++++++++ store/tikv/oracle/oracle.go | 2 ++ store/tikv/oracle/oracles/local.go | 8 ++++++++ store/tikv/oracle/oracles/pd.go | 18 ++++++++++++++++++ 13 files changed, 101 insertions(+), 5 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 3709a34ecfbd6..88631aa1e6058 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -373,6 +373,10 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlex if snapshotTS != 0 { return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set") } + lowResolutionTSO := sctx.GetSessionVars().LowResolutionTSO + if lowResolutionTSO { + return nil, errors.New("can not execute write statement when 'tidb_low_resolution_tso' is set") + } } var err error diff --git a/executor/executor_test.go b/executor/executor_test.go index c21b8104fae28..7eb1be4dadf81 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -2115,6 +2115,28 @@ func (s *testSuite) TestHistoryRead(c *C) { tk.MustQuery("select * from history_read order by a").Check(testkit.Rows("2 ", "4 ", "8 8", "9 9")) } +func (s *testSuite) TestLowResolutionTSORead(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set @@autocommit=1") + tk.MustExec("use test") + tk.MustExec("drop table if exists low_resolution_tso") + tk.MustExec("create table low_resolution_tso(a int)") + tk.MustExec("insert low_resolution_tso values (1)") + + // enable low resolution tso + c.Assert(tk.Se.GetSessionVars().LowResolutionTSO, IsFalse) + tk.Exec("set @@tidb_low_resolution_tso = 'on'") + c.Assert(tk.Se.GetSessionVars().LowResolutionTSO, IsTrue) + + time.Sleep(3 * time.Second) + tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("1")) + _, err := tk.Exec("update low_resolution_tso set a = 2") + c.Assert(err, NotNil) + tk.MustExec("set @@tidb_low_resolution_tso = 'off'") + tk.MustExec("update low_resolution_tso set a = 2") + tk.MustQuery("select * from low_resolution_tso").Check(testkit.Rows("2")) +} + func (s *testSuite) TestScanControlSelection(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/session/txn.go b/session/txn.go index 4942b6710cae8..27b7aeb339256 100644 --- a/session/txn.go +++ b/session/txn.go @@ -381,8 +381,9 @@ func mergeToDirtyDB(dirtyDB *executor.DirtyDB, op dirtyTableOperation) { // txnFuture is a promise, which promises to return a txn in future. type txnFuture struct { - future oracle.Future - store kv.Storage + future oracle.Future + store kv.Storage + lowResolution bool mockFail bool } @@ -408,8 +409,16 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { } oracleStore := s.store.GetOracle() - tsFuture := oracleStore.GetTimestampAsync(ctx) - ret := &txnFuture{future: tsFuture, store: s.store} + var tsFuture oracle.Future + var lowResolution bool + if s.sessionVars.LowResolutionTSO { + tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx) + lowResolution = true + } else { + tsFuture = oracleStore.GetTimestampAsync(ctx) + lowResolution = false + } + ret := &txnFuture{future: tsFuture, store: s.store, lowResolution: lowResolution} if x := ctx.Value("mockGetTSFail"); x != nil { ret.mockFail = true } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 1deb125aafd17..e760fad906293 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -376,6 +376,9 @@ type SessionVars struct { // PessimisticLock indicates whether new transaction should be pessimistic . PessimisticLock bool + + // LowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds + LowResolutionTSO bool } // ConnectionInfo present connection used by audit. @@ -426,6 +429,7 @@ func NewSessionVars() *SessionVars { CommandValue: uint32(mysql.ComSleep), TiDBOptJoinReorderThreshold: DefTiDBOptJoinReorderThreshold, SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, + LowResolutionTSO: false, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -793,6 +797,8 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.WaitTableSplitFinish = TiDBOptOn(val) case TiDBPessimisticLock: s.PessimisticLock = TiDBOptOn(val) + case TiDBLowResolutionTSO: + s.LowResolutionTSO = TiDBOptOn(val) } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index bd8cf2d326f88..3c349a07f66b4 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -696,6 +696,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBCheckMb4ValueInUTF8, BoolToIntStr(config.GetGlobalConfig().CheckMb4ValueInUTF8)}, {ScopeSession, TiDBSlowQueryFile, ""}, {ScopeSession, TiDBWaitTableSplitFinish, BoolToIntStr(DefTiDBWaitTableSplitFinish)}, + {ScopeSession, TiDBLowResolutionTSO, "0"}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index dfdc410b88343..e861e120a9fd4 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -44,4 +44,6 @@ func (*testSysVarSuite) TestSysVar(c *C) { c.Assert(f, NotNil) c.Assert(f.Value, Equals, "4000") + f = GetSysVar("tidb_low_resolution_tso") + c.Assert(f.Value, Equals, "0") } diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index b8ae7498ec1cb..31406bed7683f 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -138,6 +138,9 @@ const ( // tidb_skip_isolation_level_check is used to control whether to return error when set unsupported transaction // isolation level. TiDBSkipIsolationLevelCheck = "tidb_skip_isolation_level_check" + + // TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds + TiDBLowResolutionTSO = "tidb_low_resolution_tso" ) // TiDB system variable names that both in session and global scope. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 235d35baa22a9..edd9db2b3c9a3 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -417,7 +417,7 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, TiDBOptInSubqToJoinAndAgg, TiDBEnableFastAnalyze, TiDBBatchInsert, TiDBDisableTxnAutoRetry, TiDBEnableStreaming, TiDBBatchDelete, TiDBBatchCommit, TiDBEnableCascadesPlanner, TiDBEnableWindowFunction, - TiDBCheckMb4ValueInUTF8: + TiDBCheckMb4ValueInUTF8, TiDBLowResolutionTSO: if strings.EqualFold(value, "ON") || value == "1" || strings.EqualFold(value, "OFF") || value == "0" { return value, nil } diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 4875582fd39e0..f4af0357a682b 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -276,6 +276,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(val, Equals, "0") c.Assert(config.GetGlobalConfig().CheckMb4ValueInUTF8, Equals, false) + SetSessionSystemVar(v, TiDBLowResolutionTSO, types.NewStringDatum("1")) + val, err = GetSessionSystemVar(v, TiDBLowResolutionTSO) + c.Assert(err, IsNil) + c.Assert(val, Equals, "1") + c.Assert(v.LowResolutionTSO, Equals, true) + SetSessionSystemVar(v, TiDBLowResolutionTSO, types.NewStringDatum("0")) + val, err = GetSessionSystemVar(v, TiDBLowResolutionTSO) + c.Assert(err, IsNil) + c.Assert(val, Equals, "0") + c.Assert(v.LowResolutionTSO, Equals, false) + c.Assert(v.CorrelationThreshold, Equals, 0.9) err = SetSessionSystemVar(v, TiDBOptCorrelationThreshold, types.NewStringDatum("0")) c.Assert(err, IsNil) diff --git a/store/mockoracle/oracle.go b/store/mockoracle/oracle.go index 07947cde6422f..333c66d6e576d 100644 --- a/store/mockoracle/oracle.go +++ b/store/mockoracle/oracle.go @@ -92,6 +92,16 @@ func (o *MockOracle) GetTimestampAsync(ctx context.Context) oracle.Future { return &mockOracleFuture{o, ctx} } +// GetLowResolutionTimestamp implements oracle.Oracle interface. +func (o *MockOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) { + return o.GetTimestamp(ctx) +} + +// GetLowResolutionTimestampAsync implements oracle.Oracle interface. +func (o *MockOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future { + return o.GetTimestampAsync(ctx) +} + // IsExpired implements oracle.Oracle interface. func (o *MockOracle) IsExpired(lockTimestamp uint64, TTL uint64) bool { o.RLock() diff --git a/store/tikv/oracle/oracle.go b/store/tikv/oracle/oracle.go index b7b8857e8a93d..8d54c949898e0 100644 --- a/store/tikv/oracle/oracle.go +++ b/store/tikv/oracle/oracle.go @@ -22,6 +22,8 @@ import ( type Oracle interface { GetTimestamp(ctx context.Context) (uint64, error) GetTimestampAsync(ctx context.Context) Future + GetLowResolutionTimestamp(ctx context.Context) (uint64, error) + GetLowResolutionTimestampAsync(ctx context.Context) Future IsExpired(lockTimestamp uint64, TTL uint64) bool Close() } diff --git a/store/tikv/oracle/oracles/local.go b/store/tikv/oracle/oracles/local.go index b56a32f378127..daf260fe8cc8e 100644 --- a/store/tikv/oracle/oracles/local.go +++ b/store/tikv/oracle/oracles/local.go @@ -59,6 +59,14 @@ func (l *localOracle) GetTimestampAsync(ctx context.Context) oracle.Future { } } +func (l *localOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) { + return l.GetTimestamp(ctx) +} + +func (l *localOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future { + return l.GetTimestampAsync(ctx) +} + type future struct { ctx context.Context l *localOracle diff --git a/store/tikv/oracle/oracles/pd.go b/store/tikv/oracle/oracles/pd.go index 3a044d261714e..f9b46b0cf6f03 100644 --- a/store/tikv/oracle/oracles/pd.go +++ b/store/tikv/oracle/oracles/pd.go @@ -140,3 +140,21 @@ func (o *pdOracle) updateTS(ctx context.Context, interval time.Duration) { func (o *pdOracle) Close() { close(o.quit) } + +type lowResolutionTsFuture uint64 + +// Wait implements the oracle.Future interface. +func (f lowResolutionTsFuture) Wait() (uint64, error) { + return uint64(f), nil +} + +// GetLowResolutionTimestamp gets a new increasing time. +func (o *pdOracle) GetLowResolutionTimestamp(ctx context.Context) (uint64, error) { + lastTS := atomic.LoadUint64(&o.lastTS) + return lastTS, nil +} + +func (o *pdOracle) GetLowResolutionTimestampAsync(ctx context.Context) oracle.Future { + lastTS := atomic.LoadUint64(&o.lastTS) + return lowResolutionTsFuture(lastTS) +} From e266d7c008f9655e89217c9d8108bc9772547526 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Tue, 14 May 2019 09:17:31 +0800 Subject: [PATCH 2/4] Remove not used lowResolution flag in txnFuture Signed-off-by: Xiaoguang Sun --- session/txn.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/session/txn.go b/session/txn.go index 27b7aeb339256..15f0d6405b7e4 100644 --- a/session/txn.go +++ b/session/txn.go @@ -381,9 +381,8 @@ func mergeToDirtyDB(dirtyDB *executor.DirtyDB, op dirtyTableOperation) { // txnFuture is a promise, which promises to return a txn in future. type txnFuture struct { - future oracle.Future - store kv.Storage - lowResolution bool + future oracle.Future + store kv.Storage mockFail bool } @@ -410,15 +409,12 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { oracleStore := s.store.GetOracle() var tsFuture oracle.Future - var lowResolution bool if s.sessionVars.LowResolutionTSO { tsFuture = oracleStore.GetLowResolutionTimestampAsync(ctx) - lowResolution = true } else { tsFuture = oracleStore.GetTimestampAsync(ctx) - lowResolution = false } - ret := &txnFuture{future: tsFuture, store: s.store, lowResolution: lowResolution} + ret := &txnFuture{future: tsFuture, store: s.store} if x := ctx.Value("mockGetTSFail"); x != nil { ret.mockFail = true } From bc79d29637192d60641098910e056a5c26ecac23 Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Tue, 14 May 2019 09:23:07 +0800 Subject: [PATCH 3/4] Fix comments Signed-off-by: Xiaoguang Sun --- sessionctx/variable/session.go | 4 ++-- store/tikv/oracle/oracles/pd.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index e760fad906293..5babacea0f655 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -374,10 +374,10 @@ type SessionVars struct { // EnableFastAnalyze indicates whether to take fast analyze. EnableFastAnalyze bool - // PessimisticLock indicates whether new transaction should be pessimistic . + // PessimisticLock indicates whether new transaction should be pessimistic. PessimisticLock bool - // LowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds + // LowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds. LowResolutionTSO bool } diff --git a/store/tikv/oracle/oracles/pd.go b/store/tikv/oracle/oracles/pd.go index f9b46b0cf6f03..d361e13e0f712 100644 --- a/store/tikv/oracle/oracles/pd.go +++ b/store/tikv/oracle/oracles/pd.go @@ -141,6 +141,7 @@ func (o *pdOracle) Close() { close(o.quit) } +// A future that resolves immediately to a low resolution timestamp. type lowResolutionTsFuture uint64 // Wait implements the oracle.Future interface. From b2e5d403448a1ccf44b9cd79d8bf920207555e4d Mon Sep 17 00:00:00 2001 From: Xiaoguang Sun Date: Wed, 22 May 2019 21:38:03 +0800 Subject: [PATCH 4/4] Do not initialize LowResolutionTSO explicitly Signed-off-by: Xiaoguang Sun --- sessionctx/variable/session.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 5babacea0f655..a96d616878e7f 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -429,7 +429,6 @@ func NewSessionVars() *SessionVars { CommandValue: uint32(mysql.ComSleep), TiDBOptJoinReorderThreshold: DefTiDBOptJoinReorderThreshold, SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, - LowResolutionTSO: false, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency,