Skip to content

Commit 7455d4e

Browse files
frouiouiJohan Stenberg
andauthored
Reset the current lag when closing the replication lag reader. (vitessio#12683) (vitessio#12744)
* Reset the current lag when closing the replication lag reader. * Fix tests. --------- Signed-off-by: Johan Stenberg <johanstenberg92@github.com> Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr> Co-authored-by: Johan Stenberg <johanstenberg92@github.com>
1 parent 99b88e8 commit 7455d4e

File tree

3 files changed

+63
-17
lines changed

3 files changed

+63
-17
lines changed

go/vt/vttablet/tabletserver/repltracker/reader.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ func (r *heartbeatReader) Close() {
122122
}
123123
r.ticks.Stop()
124124
r.pool.Close()
125+
126+
currentLagNs.Set(0)
127+
125128
r.isOpen = false
126129
log.Info("Heartbeat Reader: closed")
127130
}

go/vt/vttablet/tabletserver/repltracker/reader_test.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,13 @@ import (
3939
func TestReaderReadHeartbeat(t *testing.T) {
4040
db := fakesqldb.New(t)
4141
defer db.Close()
42-
tr := newReader(db, mockNowFunc)
42+
43+
now := time.Now()
44+
tr := newReader(db, &now)
4345
defer tr.Close()
4446

47+
tr.pool.Open(tr.env.Config().DB.AppWithDB(), tr.env.Config().DB.DbaWithDB(), tr.env.Config().DB.AppDebugWithDB())
48+
4549
db.AddQuery(fmt.Sprintf("SELECT ts FROM %s.heartbeat WHERE keyspaceShard='%s'", "_vt", tr.keyspaceShard), &sqltypes.Result{
4650
Fields: []*querypb.Field{
4751
{Name: "ts", Type: sqltypes.Int64},
@@ -79,14 +83,46 @@ func TestReaderReadHeartbeat(t *testing.T) {
7983
utils.MustMatch(t, expectedHisto, heartbeatLagNsHistogram.Counts(), "wrong counts in histogram")
8084
}
8185

86+
// TestReaderCloseSetsCurrentLagToZero tests that when closing the heartbeat reader, the current lag is
87+
// set to zero.
88+
func TestReaderCloseSetsCurrentLagToZero(t *testing.T) {
89+
db := fakesqldb.New(t)
90+
defer db.Close()
91+
tr := newReader(db, nil)
92+
93+
db.AddQuery(fmt.Sprintf("SELECT ts FROM %s.heartbeat WHERE keyspaceShard='%s'", "_vt", tr.keyspaceShard), &sqltypes.Result{
94+
Fields: []*querypb.Field{
95+
{Name: "ts", Type: sqltypes.Int64},
96+
},
97+
Rows: [][]sqltypes.Value{{
98+
sqltypes.NewInt64(time.Now().Add(-10 * time.Second).UnixNano()),
99+
}},
100+
})
101+
102+
currentLagNs.Reset()
103+
104+
tr.Open()
105+
time.Sleep(2 * time.Second)
106+
107+
assert.Greater(t, currentLagNs.Get(), int64(0), "lag should be greater than zero")
108+
109+
tr.Close()
110+
111+
assert.Equal(t, int64(0), currentLagNs.Get(), "lag should be be zero after closing the reader.")
112+
}
113+
82114
// TestReaderReadHeartbeatError tests that we properly account for errors
83115
// encountered in the reading of heartbeat.
84116
func TestReaderReadHeartbeatError(t *testing.T) {
85117
db := fakesqldb.New(t)
86118
defer db.Close()
87-
tr := newReader(db, mockNowFunc)
119+
120+
now := time.Now()
121+
tr := newReader(db, &now)
88122
defer tr.Close()
89123

124+
tr.pool.Open(tr.env.Config().DB.AppWithDB(), tr.env.Config().DB.DbaWithDB(), tr.env.Config().DB.AppDebugWithDB())
125+
90126
cumulativeLagNs.Reset()
91127
readErrors.Reset()
92128

@@ -100,18 +136,23 @@ func TestReaderReadHeartbeatError(t *testing.T) {
100136
assert.Equal(t, int64(1), readErrors.Get(), "wrong read error count")
101137
}
102138

103-
func newReader(db *fakesqldb.DB, nowFunc func() time.Time) *heartbeatReader {
139+
func newReader(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatReader {
104140
config := tabletenv.NewDefaultConfig()
105141
config.ReplicationTracker.Mode = tabletenv.Heartbeat
106142
config.ReplicationTracker.HeartbeatIntervalSeconds = 1
107143
params, _ := db.ConnParams().MysqlParams()
108144
cp := *params
109145
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
146+
config.DB = dbc
110147

111148
tr := newHeartbeatReader(tabletenv.NewEnv(config, "ReaderTest"))
112149
tr.keyspaceShard = "test:0"
113-
tr.now = nowFunc
114-
tr.pool.Open(dbc.AppWithDB(), dbc.DbaWithDB(), dbc.AppDebugWithDB())
150+
151+
if frozenTime != nil {
152+
tr.now = func() time.Time {
153+
return *frozenTime
154+
}
155+
}
115156

116157
return tr
117158
}

go/vt/vttablet/tabletserver/repltracker/writer_test.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,11 @@ import (
3232
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
3333
)
3434

35-
var (
36-
now = time.Now()
37-
mockNowFunc = func() time.Time {
38-
return now
39-
}
40-
)
41-
4235
func TestCreateSchema(t *testing.T) {
4336
db := fakesqldb.New(t)
4437
defer db.Close()
45-
tw := newTestWriter(db, mockNowFunc)
38+
now := time.Now()
39+
tw := newTestWriter(db, &now)
4640
defer tw.Close()
4741
writes.Reset()
4842

@@ -66,7 +60,8 @@ func TestWriteHeartbeat(t *testing.T) {
6660
db := fakesqldb.New(t)
6761
defer db.Close()
6862

69-
tw := newTestWriter(db, mockNowFunc)
63+
now := time.Now()
64+
tw := newTestWriter(db, &now)
7065
upsert := fmt.Sprintf("INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%d, %d, '%s') ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)",
7166
"_vt", now.UnixNano(), tw.tabletAlias.Uid, tw.keyspaceShard)
7267
db.AddQuery(upsert, &sqltypes.Result{})
@@ -83,7 +78,8 @@ func TestWriteHeartbeatError(t *testing.T) {
8378
db := fakesqldb.New(t)
8479
defer db.Close()
8580

86-
tw := newTestWriter(db, mockNowFunc)
81+
now := time.Now()
82+
tw := newTestWriter(db, &now)
8783

8884
writes.Reset()
8985
writeErrors.Reset()
@@ -93,7 +89,7 @@ func TestWriteHeartbeatError(t *testing.T) {
9389
assert.Equal(t, int64(1), writeErrors.Get())
9490
}
9591

96-
func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *heartbeatWriter {
92+
func newTestWriter(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatWriter {
9793
config := tabletenv.NewDefaultConfig()
9894
config.ReplicationTracker.Mode = tabletenv.Heartbeat
9995
config.ReplicationTracker.HeartbeatIntervalSeconds = 1
@@ -104,7 +100,13 @@ func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *heartbeatWriter
104100

105101
tw := newHeartbeatWriter(tabletenv.NewEnv(config, "WriterTest"), &topodatapb.TabletAlias{Cell: "test", Uid: 1111})
106102
tw.keyspaceShard = "test:0"
107-
tw.now = nowFunc
103+
104+
if frozenTime != nil {
105+
tw.now = func() time.Time {
106+
return *frozenTime
107+
}
108+
}
109+
108110
tw.appPool.Open(dbc.AppWithDB())
109111
tw.allPrivsPool.Open(dbc.AllPrivsWithDB())
110112

0 commit comments

Comments
 (0)