Skip to content

Commit

Permalink
timer: fix some timezone behavior in timer framework (#51783)
Browse files Browse the repository at this point in the history
close #51768
  • Loading branch information
lcwangchao committed Mar 15, 2024
1 parent 857e5f9 commit d708569
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/timer/BUILD.bazel
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 5,
shard_count = 6,
deps = [
"//pkg/sessionctx",
"//pkg/testkit",
Expand Down
4 changes: 2 additions & 2 deletions pkg/timer/api/client_test.go
Expand Up @@ -246,7 +246,7 @@ func TestDefaultClient(t *testing.T) {
require.Empty(t, timer.EventData)
require.True(t, timer.EventStart.IsZero())
require.Equal(t, []byte("s1"), timer.SummaryData)
require.Equal(t, eventStart, timer.Watermark)
require.Equal(t, eventStart.Unix(), timer.Watermark.Unix())
require.Equal(t, EventExtra{}, timer.EventExtra)

// close event with option
Expand All @@ -267,7 +267,7 @@ func TestDefaultClient(t *testing.T) {
require.Empty(t, timer.EventData)
require.True(t, timer.EventStart.IsZero())
require.Equal(t, []byte("s2"), timer.SummaryData)
require.Equal(t, watermark, timer.Watermark)
require.Equal(t, watermark.Unix(), timer.Watermark.Unix())

// manual trigger
err = store.Update(ctx, timer.ID, &TimerUpdate{
Expand Down
21 changes: 21 additions & 0 deletions pkg/timer/api/mem_store.go
Expand Up @@ -84,6 +84,8 @@ func (s *memoryStoreCore) Create(_ context.Context, record *TimerRecord) (string
record.EventStatus = SchedEventIdle
}

normalizeTimeFields(record)

if _, ok := s.id2Timers[record.ID]; ok {
return "", errors.Trace(ErrTimerExists)
}
Expand Down Expand Up @@ -137,6 +139,7 @@ func (s *memoryStoreCore) Update(_ context.Context, timerID string, update *Time
return err
}

normalizeTimeFields(newRecord)
if err = newRecord.Validate(); err != nil {
return err
}
Expand Down Expand Up @@ -303,3 +306,21 @@ func getMemStoreTimeZoneLoc(tz string) *time.Location {

return timeutil.SystemLocation()
}

func normalizeTimeFields(record *TimerRecord) {
if record.Location == nil {
return
}

if !record.Watermark.IsZero() {
record.Watermark = record.Watermark.In(record.Location)
}

if !record.EventStart.IsZero() {
record.EventStart = record.EventStart.In(record.Location)
}

if !record.CreateTime.IsZero() {
record.CreateTime = record.CreateTime.In(record.Location)
}
}
2 changes: 1 addition & 1 deletion pkg/timer/runtime/worker_test.go
Expand Up @@ -106,7 +106,7 @@ func prepareTimer(t *testing.T, cli api.TimerClient) *api.TimerRecord {
require.Equal(t, "1m", timer.SchedPolicyExpr)
require.Equal(t, "h1", timer.HookClass)
require.True(t, timer.Enable)
require.Equal(t, watermark, timer.Watermark)
require.Equal(t, watermark.Unix(), timer.Watermark.Unix())
require.Equal(t, []byte("summary1"), timer.SummaryData)
require.True(t, !timer.CreateTime.Before(now))
require.True(t, !timer.CreateTime.After(time.Now()))
Expand Down
136 changes: 136 additions & 0 deletions pkg/timer/store_intergartion_test.go
Expand Up @@ -16,6 +16,7 @@ package timer_test

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -256,6 +257,7 @@ func runTimerStoreUpdate(ctx context.Context, t *testing.T, store *api.TimerStor
EventManualRequestID: "req2",
EventWatermark: time.Unix(456, 0),
}
tpl.CreateTime = tpl.CreateTime.In(time.UTC)
require.Equal(t, *tpl, *record)

// tags full update again
Expand Down Expand Up @@ -328,6 +330,7 @@ func runTimerStoreUpdate(ctx context.Context, t *testing.T, store *api.TimerStor
tpl.EventExtra = api.EventExtra{}
tpl.Watermark = zeroTime
tpl.SummaryData = nil
tpl.CreateTime = tpl.CreateTime.In(tpl.Location)
require.Equal(t, *tpl, *record)

// err check version
Expand Down Expand Up @@ -872,3 +875,136 @@ func TestTableStoreManualTrigger(t *testing.T) {
require.True(t, timer.ManualProcessed)
require.Equal(t, api.EventExtra{}, timer.EventExtra)
}

func TestTimerStoreWithTimeZone(t *testing.T) {
// mem store
testTimerStoreWithTimeZone(t, api.NewMemoryTimerStore(), timeutil.SystemLocation().String())

// table store
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
dbName := "test"
tblName := "timerstore"
tk.MustExec("use test")
tk.MustExec(tablestore.CreateTimerTableSQL(dbName, tblName))
tk.MustExec("set @@time_zone = 'America/Los_Angeles'")

pool := pools.NewResourcePool(func() (pools.Resource, error) {
return tk.Session(), nil
}, 1, 1, time.Second)
defer pool.Close()

timerStore := tablestore.NewTableTimerStore(1, pool, dbName, tblName, nil)
defer timerStore.Close()

testTimerStoreWithTimeZone(t, timerStore, timeutil.SystemLocation().String())
tk.MustExec("set @@global.time_zone='Asia/Tokyo'")
tk.MustExec(fmt.Sprintf("truncate table %s.%s", dbName, tblName))
testTimerStoreWithTimeZone(t, timerStore, "Asia/Tokyo")

// check time zone should be set back to the previous one.
require.Equal(t, "America/Los_Angeles", tk.Session().GetSessionVars().Location().String())
}

func testTimerStoreWithTimeZone(t *testing.T, timerStore *api.TimerStore, defaultTZ string) {
// 2024-11-03 09:30:00 UTC is 2024-11-03 01:30:00 -08:00 in `America/Los_Angeles`
// We should notice that it should not be regarded as 2024-11-03 01:30:00 -07:00
// because of DST these two times have the same format in time zone `America/Los_Angeles`.
time1, err := time.ParseInLocation(time.DateTime, "2024-11-03 09:30:00", time.UTC)
require.NoError(t, err)

time2, err := time.ParseInLocation(time.DateTime, "2024-11-03 08:30:00", time.UTC)
require.NoError(t, err)

id1, err := timerStore.Create(context.TODO(), &api.TimerRecord{
TimerSpec: api.TimerSpec{
Namespace: "default",
Key: "test1",
SchedPolicyType: api.SchedEventInterval,
SchedPolicyExpr: "1h",
Watermark: time1,
},
EventStatus: api.SchedEventTrigger,
EventStart: time2,
})
require.NoError(t, err)

id2, err := timerStore.Create(context.TODO(), &api.TimerRecord{
TimerSpec: api.TimerSpec{
Namespace: "default",
Key: "test2",
SchedPolicyType: api.SchedEventInterval,
SchedPolicyExpr: "1h",
Watermark: time2,
},
EventStatus: api.SchedEventTrigger,
EventStart: time1,
})
require.NoError(t, err)

// create case
timer1, err := timerStore.GetByID(context.TODO(), id1)
require.NoError(t, err)
require.Equal(t, time1.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String())
require.Equal(t, time2.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer1, defaultTZ)

timer2, err := timerStore.GetByID(context.TODO(), id2)
require.NoError(t, err)
require.Equal(t, time2.In(time.UTC).String(), timer2.Watermark.In(time.UTC).String())
require.Equal(t, time1.In(time.UTC).String(), timer2.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer2, defaultTZ)

// update time
require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{
Watermark: api.NewOptionalVal(time2),
EventStart: api.NewOptionalVal(time1),
}))

require.NoError(t, timerStore.Update(context.TODO(), id2, &api.TimerUpdate{
Watermark: api.NewOptionalVal(time1),
EventStart: api.NewOptionalVal(time2),
}))

timer1, err = timerStore.GetByID(context.TODO(), id1)
require.NoError(t, err)
require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String())
require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer1, defaultTZ)

timer2, err = timerStore.GetByID(context.TODO(), id2)
require.NoError(t, err)
require.Equal(t, time1.In(time.UTC).String(), timer2.Watermark.In(time.UTC).String())
require.Equal(t, time2.In(time.UTC).String(), timer2.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer2, defaultTZ)

// update timezone
require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{
TimeZone: api.NewOptionalVal("Europe/Berlin"),
}))

timer1, err = timerStore.GetByID(context.TODO(), id1)
require.NoError(t, err)
require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String())
require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer1, "Europe/Berlin")

require.NoError(t, timerStore.Update(context.TODO(), id1, &api.TimerUpdate{
TimeZone: api.NewOptionalVal(""),
}))

timer1, err = timerStore.GetByID(context.TODO(), id1)
require.NoError(t, err)
require.Equal(t, time2.In(time.UTC).String(), timer1.Watermark.In(time.UTC).String())
require.Equal(t, time1.In(time.UTC).String(), timer1.EventStart.In(time.UTC).String())
checkTimerRecordLocation(t, timer1, defaultTZ)
}

func checkTimerRecordLocation(t *testing.T, r *api.TimerRecord, tz string) {
require.Equal(t, tz, r.Location.String())
require.Same(t, r.Location, r.Watermark.Location())
require.Same(t, r.Location, r.CreateTime.Location())
if !r.EventStart.IsZero() {
require.Same(t, r.Location, r.EventStart.Location())
}
}
4 changes: 4 additions & 0 deletions pkg/timer/tablestore/BUILD.bazel
Expand Up @@ -39,9 +39,13 @@ go_test(
shard_count = 8,
deps = [
"//pkg/kv",
"//pkg/parser/ast",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/timer/api",
"//pkg/types",
"//pkg/util/sqlexec",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
Expand Down
77 changes: 76 additions & 1 deletion pkg/timer/tablestore/sql_test.go
Expand Up @@ -25,9 +25,13 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/timer/api"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -614,9 +618,56 @@ func TestTakeSession(t *testing.T) {
require.EqualError(t, err, "mockErr")
pool.AssertExpectations(t)

// Get returns a session
// init session returns error
se := &mockSession{}
pool.On("Get").Return(se, nil).Once()
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, errors.New("mockErr")).
Once()
pool.On("Put", se).Once()
r, back, err = core.takeSession()
require.Nil(t, r)
require.Nil(t, back)
require.EqualError(t, err, "mockErr")
pool.AssertExpectations(t)
se.AssertExpectations(t)

// init session returns error2
pool.On("Get").Return(se, nil).Once()
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)).
Return(nil, errors.New("mockErr2")).
Once()
pool.On("Put", se).Once()
r, back, err = core.takeSession()
require.Nil(t, r)
require.Nil(t, back)
require.EqualError(t, err, "mockErr2")
pool.AssertExpectations(t)
se.AssertExpectations(t)

// Get returns a session
pool.On("Get").Return(se, nil).Once()
rs := &sqlexec.SimpleRecordSet{
ResultFields: []*ast.ResultField{{
Column: &model.ColumnInfo{
FieldType: *types.NewFieldType(mysql.TypeString),
},
}},
MaxChunkSize: 1,
Rows: [][]any{{"tz1"}},
}
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)).
Return(rs, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SET @@time_zone='UTC'", []any(nil)).
Return(nil, nil).
Once()
r, back, err = core.takeSession()
require.Equal(t, r, se)
require.NotNil(t, back)
Expand All @@ -633,15 +684,39 @@ func TestTakeSession(t *testing.T) {
pool.AssertExpectations(t)
se.AssertExpectations(t)

// Put session failed2
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SET @@time_zone=%?", []any{"tz1"}).
Return(nil, errors.New("mockErr2")).
Once()
se.On("Close").Once()
back()
pool.AssertExpectations(t)
se.AssertExpectations(t)

// Put session success
pool.On("Get").Return(se, nil).Once()
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SELECT @@time_zone", []any(nil)).
Return(rs, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SET @@time_zone='UTC'", []any(nil)).
Return(nil, nil).
Once()
r, back, err = core.takeSession()
require.Equal(t, r, se)
require.NotNil(t, back)
require.Nil(t, err)
se.On("ExecuteInternal", matchCtx, "ROLLBACK", []any(nil)).
Return(nil, nil).
Once()
se.On("ExecuteInternal", matchCtx, "SET @@time_zone=%?", []any{"tz1"}).
Return(nil, nil).
Once()
pool.On("Put", se).Once()
back()
pool.AssertExpectations(t)
Expand Down

0 comments on commit d708569

Please sign in to comment.