From d708569747506b584c43378c7fdd329ec5e8af59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Fri, 15 Mar 2024 12:35:18 +0800 Subject: [PATCH] timer: fix some timezone behavior in timer framework (#51783) close pingcap/tidb#51768 --- pkg/timer/BUILD.bazel | 2 +- pkg/timer/api/client_test.go | 4 +- pkg/timer/api/mem_store.go | 21 +++++ pkg/timer/runtime/worker_test.go | 2 +- pkg/timer/store_intergartion_test.go | 136 +++++++++++++++++++++++++++ pkg/timer/tablestore/BUILD.bazel | 4 + pkg/timer/tablestore/sql_test.go | 77 ++++++++++++++- pkg/timer/tablestore/store.go | 79 ++++++++++++---- 8 files changed, 300 insertions(+), 25 deletions(-) diff --git a/pkg/timer/BUILD.bazel b/pkg/timer/BUILD.bazel index 967cd29c3a9d3..4a4ab63159e87 100644 --- a/pkg/timer/BUILD.bazel +++ b/pkg/timer/BUILD.bazel @@ -9,7 +9,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 5, + shard_count = 6, deps = [ "//pkg/sessionctx", "//pkg/testkit", diff --git a/pkg/timer/api/client_test.go b/pkg/timer/api/client_test.go index ac8316cdf7905..888defc1170ac 100644 --- a/pkg/timer/api/client_test.go +++ b/pkg/timer/api/client_test.go @@ -246,7 +246,7 @@ func TestDefaultClient(t *testing.T) { require.Empty(t, timer.EventData) require.True(t, timer.EventStart.IsZero()) require.Equal(t, []byte("s1"), timer.SummaryData) - require.Equal(t, eventStart, timer.Watermark) + require.Equal(t, eventStart.Unix(), timer.Watermark.Unix()) require.Equal(t, EventExtra{}, timer.EventExtra) // close event with option @@ -267,7 +267,7 @@ func TestDefaultClient(t *testing.T) { require.Empty(t, timer.EventData) require.True(t, timer.EventStart.IsZero()) require.Equal(t, []byte("s2"), timer.SummaryData) - require.Equal(t, watermark, timer.Watermark) + require.Equal(t, watermark.Unix(), timer.Watermark.Unix()) // manual trigger err = store.Update(ctx, timer.ID, &TimerUpdate{ diff --git a/pkg/timer/api/mem_store.go b/pkg/timer/api/mem_store.go index 803fa1d3fa16e..d46e24067a6ee 100644 --- a/pkg/timer/api/mem_store.go +++ b/pkg/timer/api/mem_store.go @@ -84,6 +84,8 @@ func (s *memoryStoreCore) Create(_ context.Context, record *TimerRecord) (string record.EventStatus = SchedEventIdle } + normalizeTimeFields(record) + if _, ok := s.id2Timers[record.ID]; ok { return "", errors.Trace(ErrTimerExists) } @@ -137,6 +139,7 @@ func (s *memoryStoreCore) Update(_ context.Context, timerID string, update *Time return err } + normalizeTimeFields(newRecord) if err = newRecord.Validate(); err != nil { return err } @@ -303,3 +306,21 @@ func getMemStoreTimeZoneLoc(tz string) *time.Location { return timeutil.SystemLocation() } + +func normalizeTimeFields(record *TimerRecord) { + if record.Location == nil { + return + } + + if !record.Watermark.IsZero() { + record.Watermark = record.Watermark.In(record.Location) + } + + if !record.EventStart.IsZero() { + record.EventStart = record.EventStart.In(record.Location) + } + + if !record.CreateTime.IsZero() { + record.CreateTime = record.CreateTime.In(record.Location) + } +} diff --git a/pkg/timer/runtime/worker_test.go b/pkg/timer/runtime/worker_test.go index 0c2aef2079bd7..21983bdaf7cc2 100644 --- a/pkg/timer/runtime/worker_test.go +++ b/pkg/timer/runtime/worker_test.go @@ -106,7 +106,7 @@ func prepareTimer(t *testing.T, cli api.TimerClient) *api.TimerRecord { require.Equal(t, "1m", timer.SchedPolicyExpr) require.Equal(t, "h1", timer.HookClass) require.True(t, timer.Enable) - require.Equal(t, watermark, timer.Watermark) + require.Equal(t, watermark.Unix(), timer.Watermark.Unix()) require.Equal(t, []byte("summary1"), timer.SummaryData) require.True(t, !timer.CreateTime.Before(now)) require.True(t, !timer.CreateTime.After(time.Now())) diff --git a/pkg/timer/store_intergartion_test.go b/pkg/timer/store_intergartion_test.go index b68a6a2b847d3..954c906ac17cc 100644 --- a/pkg/timer/store_intergartion_test.go +++ b/pkg/timer/store_intergartion_test.go @@ -16,6 +16,7 @@ package timer_test import ( "context" + "fmt" "sync/atomic" "testing" "time" @@ -256,6 +257,7 @@ func runTimerStoreUpdate(ctx context.Context, t *testing.T, store *api.TimerStor EventManualRequestID: "req2", EventWatermark: time.Unix(456, 0), } + tpl.CreateTime = tpl.CreateTime.In(time.UTC) require.Equal(t, *tpl, *record) // tags full update again @@ -328,6 +330,7 @@ func runTimerStoreUpdate(ctx context.Context, t *testing.T, store *api.TimerStor tpl.EventExtra = api.EventExtra{} tpl.Watermark = zeroTime tpl.SummaryData = nil + tpl.CreateTime = tpl.CreateTime.In(tpl.Location) require.Equal(t, *tpl, *record) // err check version @@ -872,3 +875,136 @@ func TestTableStoreManualTrigger(t *testing.T) { require.True(t, timer.ManualProcessed) require.Equal(t, api.EventExtra{}, timer.EventExtra) } + +func TestTimerStoreWithTimeZone(t *testing.T) { + // mem store + testTimerStoreWithTimeZone(t, api.NewMemoryTimerStore(), timeutil.SystemLocation().String()) + + // table store + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + dbName := "test" + tblName := "timerstore" + tk.MustExec("use test") + tk.MustExec(tablestore.CreateTimerTableSQL(dbName, tblName)) + tk.MustExec("set @@time_zone = 'America/Los_Angeles'") + + pool := pools.NewResourcePool(func() (pools.Resource, error) { + return tk.Session(), nil + }, 1, 1, time.Second) + defer pool.Close() + + timerStore := tablestore.NewTableTimerStore(1, pool, dbName, tblName, nil) + defer timerStore.Close() + + testTimerStoreWithTimeZone(t, timerStore, timeutil.SystemLocation().String()) + tk.MustExec("set @@global.time_zone='Asia/Tokyo'") + tk.MustExec(fmt.Sprintf("truncate table %s.%s", dbName, tblName)) + testTimerStoreWithTimeZone(t, timerStore, "Asia/Tokyo") + + // check time zone should be set back to the previous one. + require.Equal(t, "America/Los_Angeles", tk.Session().GetSessionVars().Location().String()) +} + +func testTimerStoreWithTimeZone(t *testing.T, timerStore *api.TimerStore, defaultTZ string) { + // 2024-11-03 09:30:00 UTC is 2024-11-03 01:30:00 -08:00 in `America/Los_Angeles` + // We should notice that it should not be regarded as 2024-11-03 01:30:00 -07:00 + // because of DST these two times have the same format in time zone `America/Los_Angeles`. + time1, err := time.ParseInLocation(time.DateTime, "2024-11-03 09:30:00", time.UTC) + require.NoError(t, err) + + time2, err := time.ParseInLocation(time.DateTime, "2024-11-03 08:30:00", time.UTC) + require.NoError(t, err) + + id1, err := timerStore.Create(context.TODO(), &api.TimerRecord{ + TimerSpec: api.TimerSpec{ + Namespace: "default", + Key: "test1", + SchedPolicyType: api.SchedEventInterval, + SchedPolicyExpr: "1h", + Watermark: time1, + }, + EventStatus: api.SchedEventTrigger, + EventStart: time2, + }) + require.NoError(t, err) + + id2, err := timerStore.Create(context.TODO(), &api.TimerRecord{ + TimerSpec: api.TimerSpec{ + Namespace: "default", + Key: "test2", + SchedPolicyType: api.SchedEventInterval, + SchedPolicyExpr: "1h", + Watermark: time2, + }, + EventStatus: api.SchedEventTrigger, + EventStart: time1, + }) + require.NoError(t, err) + + // create case + timer1, err := timerStore.GetByID(context.TODO(), id1) + require.NoError(t, err) + require.Equal(t, time1.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String()) + require.Equal(t, time2.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String()) + checkTimerRecordLocation(t, timer1, defaultTZ) + + timer2, err := timerStore.GetByID(context.TODO(), id2) + require.NoError(t, err) + require.Equal(t, time2.In(time.UTC).String(), timer2.Watermark.In(time.UTC).String()) + require.Equal(t, time1.In(time.UTC).String(), timer2.EventStart.In(time.UTC).String()) + checkTimerRecordLocation(t, timer2, defaultTZ) + + // update time + require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{ + Watermark: api.NewOptionalVal(time2), + EventStart: api.NewOptionalVal(time1), + })) + + require.NoError(t, timerStore.Update(context.TODO(), id2, &api.TimerUpdate{ + Watermark: api.NewOptionalVal(time1), + EventStart: api.NewOptionalVal(time2), + })) + + timer1, err = timerStore.GetByID(context.TODO(), id1) + require.NoError(t, err) + require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String()) + require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String()) + checkTimerRecordLocation(t, timer1, defaultTZ) + + timer2, err = timerStore.GetByID(context.TODO(), id2) + require.NoError(t, err) + require.Equal(t, time1.In(time.UTC).String(), timer2.Watermark.In(time.UTC).String()) + require.Equal(t, time2.In(time.UTC).String(), timer2.EventStart.In(time.UTC).String()) + checkTimerRecordLocation(t, timer2, defaultTZ) + + // update timezone + require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{ + TimeZone: api.NewOptionalVal("Europe/Berlin"), + })) + + timer1, err = timerStore.GetByID(context.TODO(), id1) + require.NoError(t, err) + require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String()) + require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String()) + checkTimerRecordLocation(t, timer1, "Europe/Berlin") + + require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{ + TimeZone: api.NewOptionalVal(""), + })) + + timer1, err = timerStore.GetByID(context.TODO(), id1) + require.NoError(t, err) + require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String()) + require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String()) + checkTimerRecordLocation(t, timer1, defaultTZ) +} + +func checkTimerRecordLocation(t *testing.T, r *api.TimerRecord, tz string) { + require.Equal(t, tz, r.Location.String()) + require.Same(t, r.Location, r.Watermark.Location()) + require.Same(t, r.Location, r.CreateTime.Location()) + if !r.EventStart.IsZero() { + require.Same(t, r.Location, r.EventStart.Location()) + } +} diff --git a/pkg/timer/tablestore/BUILD.bazel b/pkg/timer/tablestore/BUILD.bazel index 9af2e0252410c..1a49d4bf46803 100644 --- a/pkg/timer/tablestore/BUILD.bazel +++ b/pkg/timer/tablestore/BUILD.bazel @@ -39,9 +39,13 @@ go_test( shard_count = 8, deps = [ "//pkg/kv", + "//pkg/parser/ast", + "//pkg/parser/model", + "//pkg/parser/mysql", "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/timer/api", + "//pkg/types", "//pkg/util/sqlexec", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", diff --git a/pkg/timer/tablestore/sql_test.go b/pkg/timer/tablestore/sql_test.go index 3b63314e3906e..083d4b2f98f00 100644 --- a/pkg/timer/tablestore/sql_test.go +++ b/pkg/timer/tablestore/sql_test.go @@ -25,9 +25,13 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/timer/api" + "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -614,9 +618,56 @@ func TestTakeSession(t *testing.T) { require.EqualError(t, err, "mockErr") pool.AssertExpectations(t) - // Get returns a session + // init session returns error se := &mockSession{} pool.On("Get").Return(se, nil).Once() + se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)). + Return(nil, errors.New("mockErr")). + Once() + pool.On("Put", se).Once() + r, back, err = core.takeSession() + require.Nil(t, r) + require.Nil(t, back) + require.EqualError(t, err, "mockErr") + pool.AssertExpectations(t) + se.AssertExpectations(t) + + // init session returns error2 + pool.On("Get").Return(se, nil).Once() + se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)). + Return(nil, nil). + Once() + se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)). + Return(nil, errors.New("mockErr2")). + Once() + pool.On("Put", se).Once() + r, back, err = core.takeSession() + require.Nil(t, r) + require.Nil(t, back) + require.EqualError(t, err, "mockErr2") + pool.AssertExpectations(t) + se.AssertExpectations(t) + + // Get returns a session + pool.On("Get").Return(se, nil).Once() + rs := &sqlexec.SimpleRecordSet{ + ResultFields: []*ast.ResultField{{ + Column: &model.ColumnInfo{ + FieldType: *types.NewFieldType(mysql.TypeString), + }, + }}, + MaxChunkSize: 1, + Rows: [][]any{{"tz1"}}, + } + se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)). + Return(nil, nil). + Once() + se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)). + Return(rs, nil). + Once() + se.On("ExecuteInternal", matchCtx, "SET @@time_zone='UTC'", []any(nil)). + Return(nil, nil). + Once() r, back, err = core.takeSession() require.Equal(t, r, se) require.NotNil(t, back) @@ -633,8 +684,29 @@ func TestTakeSession(t *testing.T) { pool.AssertExpectations(t) se.AssertExpectations(t) + // Put session failed2 + se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)). + Return(nil, nil). + Once() + se.On("ExecuteInternal", matchCtx, "SET @@time_zone=%?", []any{"tz1"}). + Return(nil, errors.New("mockErr2")). + Once() + se.On("Close").Once() + back() + pool.AssertExpectations(t) + se.AssertExpectations(t) + // Put session success pool.On("Get").Return(se, nil).Once() + se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)). + Return(nil, nil). + Once() + se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)). + Return(rs, nil). + Once() + se.On("ExecuteInternal", matchCtx, "SET @@time_zone='UTC'", []any(nil)). + Return(nil, nil). + Once() r, back, err = core.takeSession() require.Equal(t, r, se) require.NotNil(t, back) @@ -642,6 +714,9 @@ func TestTakeSession(t *testing.T) { se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)). Return(nil, nil). Once() + se.On("ExecuteInternal", matchCtx, "SET @@time_zone=%?", []any{"tz1"}). + Return(nil, nil). + Once() pool.On("Put", se).Once() back() pool.AssertExpectations(t) diff --git a/pkg/timer/tablestore/store.go b/pkg/timer/tablestore/store.go index 988c23feb0961..3e6f275ed6002 100644 --- a/pkg/timer/tablestore/store.go +++ b/pkg/timer/tablestore/store.go @@ -141,6 +141,11 @@ func (s *tableTimerStoreCore) List(ctx context.Context, cond api.Cond) ([]*api.T return nil, err } + tidbTimeZone, err := sctx.GetSessionVars().GetGlobalSystemVar(ctx, variable.TimeZone) + if err != nil { + return nil, err + } + timers := make([]*api.TimerRecord, 0, len(rows)) for _, row := range rows { var timerData []byte @@ -148,12 +153,25 @@ func (s *tableTimerStoreCore) List(ctx context.Context, cond api.Cond) ([]*api.T timerData = row.GetBytes(3) } + tz := row.GetString(4) + tzParse := tz + // handling value "TIDB" is for compatibility of version 7.3.0 + if tz == "" || strings.EqualFold(tz, "TIDB") { + tzParse = tidbTimeZone + } + + loc, err := timeutil.ParseTimeZone(tzParse) + if err != nil { + loc = timeutil.SystemLocation() + } + var watermark time.Time if !row.IsNull(8) { watermark, err = row.GetTime(8).GoTime(seTZ) if err != nil { return nil, err } + watermark = watermark.In(loc) } var ext timerExt @@ -175,6 +193,7 @@ func (s *tableTimerStoreCore) List(ctx context.Context, cond api.Cond) ([]*api.T if err != nil { return nil, err } + eventStart = eventStart.In(loc) } var summaryData []byte @@ -188,6 +207,7 @@ func (s *tableTimerStoreCore) List(ctx context.Context, cond api.Cond) ([]*api.T if err != nil { return nil, err } + createTime = createTime.In(loc) } timer := &api.TimerRecord{ @@ -197,7 +217,7 @@ func (s *tableTimerStoreCore) List(ctx context.Context, cond api.Cond) ([]*api.T Key: row.GetString(2), Tags: ext.Tags, Data: timerData, - TimeZone: row.GetString(4), + TimeZone: tz, SchedPolicyType: api.SchedPolicyType(row.GetString(5)), SchedPolicyExpr: row.GetString(6), HookClass: row.GetString(7), @@ -211,25 +231,10 @@ func (s *tableTimerStoreCore) List(ctx context.Context, cond api.Cond) ([]*api.T EventStart: eventStart, EventExtra: ext.Event.ToEventExtra(), SummaryData: summaryData, + Location: loc, CreateTime: createTime, Version: row.GetUint64(18), } - - tz := timer.TimeZone - // handling value "TIDB" is for compatibility of version 7.3.0 - if tz == "" || strings.EqualFold(tz, "TIDB") { - if tz, err = sctx.GetSessionVars().GetGlobalSystemVar(ctx, variable.TimeZone); err != nil { - return nil, err - } - } - - loc, err := timeutil.ParseTimeZone(tz) - if err == nil { - timer.Location = loc - } else { - timer.Location = timeutil.SystemLocation() - } - timers = append(timers, timer) } return timers, nil @@ -327,20 +332,47 @@ func (s *tableTimerStoreCore) Close() { s.notifier.Close() } -func (s *tableTimerStoreCore) takeSession() (sessionctx.Context, func(), error) { +func (s *tableTimerStoreCore) takeSession() (_ sessionctx.Context, _ func(), err error) { r, err := s.pool.Get() if err != nil { return nil, nil, err } + defer func() { + if err != nil { + s.pool.Put(r) + } + }() + sctx, ok := r.(sessionctx.Context) if !ok { - s.pool.Put(r) return nil, nil, errors.New("session is not the type sessionctx.Context") } + ctx := context.Background() + + // rollback first to terminate unexpected transactions + if _, err = executeSQL(ctx, sctx, "ROLLBACK"); err != nil { + return nil, nil, err + } + + // we should force to set time zone to UTC to make sure time operations are consistent. + rows, err := executeSQL(ctx, sctx, "SELECT @@time_zone") + if err != nil { + return nil, nil, err + } + + if len(rows) == 0 || rows[0].Len() == 0 { + return nil, nil, errors.New("failed to get original time zone of session") + } + + if _, err = executeSQL(ctx, sctx, "SET @@time_zone='UTC'"); err != nil { + return nil, nil, err + } + + originalTimeZone := rows[0].GetString(0) back := func() { - if _, err = executeSQL(context.Background(), sctx, "ROLLBACK"); err != nil { + if _, err = executeSQL(ctx, sctx, "ROLLBACK"); err != nil { // Though this branch is rarely to be called because "ROLLBACK" will always be successfully, we still need // to handle it here to make sure the code is strong. terror.Log(err) @@ -348,6 +380,13 @@ func (s *tableTimerStoreCore) takeSession() (sessionctx.Context, func(), error) r.Close() return } + + if _, err = executeSQL(ctx, sctx, "SET @@time_zone=%?", originalTimeZone); err != nil { + terror.Log(err) + r.Close() + return + } + s.pool.Put(r) }