Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*:Rollback config in store when kv.persist failed #1476

Merged
merged 34 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b966728
Merge pull request #1 from pingcap/master
bradyjoestar Mar 20, 2019
b263e13
Merge pull request #2 from pingcap/master
bradyjoestar Mar 24, 2019
3af8fe3
clear store when persist failed
swbsin Mar 25, 2019
7c02023
log more info
swbsin Mar 25, 2019
bc77251
gofmt
swbsin Mar 25, 2019
c677fd2
fix error info
swbsin Mar 25, 2019
98f9c4b
Merge branch 'master' into issue-1475
bradyjoestar Mar 25, 2019
5abed59
Merge branch 'master' into issue-1475
bradyjoestar Mar 27, 2019
8e1ec51
Merge branch 'master' into issue-1475
bradyjoestar Mar 27, 2019
a77a952
add unit_test
swbsin Mar 27, 2019
3a758c3
try to debug jenkins error
swbsin Mar 27, 2019
237f5a5
try to fix jenkins bug
swbsin Mar 27, 2019
892d1b7
try to fix jenkins error
swbsin Mar 27, 2019
19a566b
fix data race bug
swbsin Mar 27, 2019
b5fbb9d
prolong time for region kv to flush
swbsin Mar 27, 2019
5548cdc
Update region_syncer_test.go
bradyjoestar Mar 27, 2019
ea2da65
better output log
swbsin Mar 29, 2019
2441a70
Merge branch 'master' into issue-1475
nolouch Apr 9, 2019
800ed08
Merge branch 'master' into issue-1475
bradyjoestar Apr 10, 2019
300ac33
convert 5s to 3s
bradyjoestar Apr 11, 2019
8a090f4
Merge branch 'issue-1475' of github.com:bradyjoestar/pd into issue-1475
bradyjoestar Apr 11, 2019
fbb7cc8
Merge branch 'master' into issue-1475
nolouch Apr 15, 2019
54e34c0
Merge branch 'master' into issue-1475
bradyjoestar Apr 17, 2019
28689dc
replace sync.map
bradyjoestar Apr 17, 2019
bd2754c
fix ci bug
bradyjoestar Apr 17, 2019
2f777cc
rebuild jenkins
bradyjoestar Apr 17, 2019
6ad3899
Merge branch 'master' into issue-1475
bradyjoestar Apr 18, 2019
51e9451
Merge branch 'master' into issue-1475
bradyjoestar Apr 19, 2019
21c62e4
Merge branch 'master' into issue-1475
nolouch Apr 25, 2019
4b64562
extract function
bradyjoestar May 21, 2019
0ec5424
go fmt
bradyjoestar May 21, 2019
4db8982
revert system_mon
bradyjoestar May 21, 2019
0446e27
Merge branch 'master' into issue-1475
bradyjoestar May 21, 2019
b710e24
Merge branch 'master' into issue-1475
nolouch May 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions server/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package server
import (
"context"
"fmt"
"github.com/pkg/errors"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not in good order. We recommend to put it with other 3rd part packages.

"strings"
"sync"
"time"
Expand Down Expand Up @@ -44,6 +45,14 @@ type testClusterSuite struct {
baseCluster
}

type testErrorKV struct {
core.KVBase
}

func (kv *testErrorKV) Save(key, value string) error {
return errors.New("save failed")
}

func mustNewGrpcClient(c *C, addr string) pdpb.PDClient {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithInsecure())

Expand Down Expand Up @@ -600,3 +609,83 @@ func (s *testGetStoresSuite) BenchmarkGetStores(c *C) {
s.cluster.core.Stores.GetStores()
}
}

func (s *testClusterSuite) TestSetScheduleOpt(c *C) {
var err error
var cleanup func()
_, s.svr, cleanup, err = NewTestServer(c)
c.Assert(err, IsNil)
mustWaitLeader(c, []*Server{s.svr})
s.grpcPDClient = mustNewGrpcClient(c, s.svr.GetAddr())
defer cleanup()
clusterID := s.svr.clusterID

storeAddr := "127.0.0.1:0"
_, err = s.svr.bootstrapCluster(s.newBootstrapRequest(c, clusterID, storeAddr))
c.Assert(err, IsNil)

_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)

scheduleCfg := opt.load()
replicateCfg := s.svr.GetReplicationConfig()
pdServerCfg := s.svr.scheduleOpt.loadPDServerConfig()

//PUT GET DELETE successed
replicateCfg.MaxReplicas = 5
scheduleCfg.MaxSnapshotCount = 10
pdServerCfg.UseRegionStorage = true
typ, labelKey, labelValue := "testTyp", "testKey", "testValue"
nsConfig := NamespaceConfig{LeaderScheduleLimit: uint64(200)}

c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), IsNil)
c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), IsNil)
c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), IsNil)
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil)

c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5))
c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10))
c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true)
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey")
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue")
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))

c.Assert(s.svr.DeleteNamespaceConfig("testNS"), IsNil)
c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), IsNil)

c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0))
c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0)

//PUT GET failed
oldKV := s.svr.kv
s.svr.kv = core.NewKV(&testErrorKV{})
replicateCfg.MaxReplicas = 7
scheduleCfg.MaxSnapshotCount = 20
pdServerCfg.UseRegionStorage = false

c.Assert(s.svr.SetScheduleConfig(*scheduleCfg), NotNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), NotNil)
c.Assert(s.svr.SetPDServerConfig(*pdServerCfg), NotNil)
c.Assert(s.svr.SetLabelProperty(typ, labelKey, labelValue), NotNil)
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), NotNil)

c.Assert(s.svr.GetReplicationConfig().MaxReplicas, Equals, uint64(5))
c.Assert(s.svr.scheduleOpt.GetMaxSnapshotCount(), Equals, uint64(10))
c.Assert(s.svr.scheduleOpt.loadPDServerConfig().UseRegionStorage, Equals, true)
c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(0))
c.Assert(len(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ]), Equals, 0)

//DELETE failed
s.svr.kv = oldKV
c.Assert(s.svr.SetNamespaceConfig("testNS", nsConfig), IsNil)
c.Assert(s.svr.SetReplicationConfig(*replicateCfg), IsNil)

s.svr.kv = core.NewKV(&testErrorKV{})
c.Assert(s.svr.DeleteLabelProperty(typ, labelKey, labelValue), NotNil)
c.Assert(s.svr.DeleteNamespaceConfig("testNS"), NotNil)

c.Assert(s.svr.GetNamespaceConfig("testNS").LeaderScheduleLimit, Equals, uint64(200))
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Key, Equals, "testKey")
c.Assert(s.svr.scheduleOpt.loadLabelPropertyConfig()[typ][0].Value, Equals, "testValue")
}
52 changes: 51 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,11 @@ func (s *Server) SetScheduleConfig(cfg ScheduleConfig) error {
old := s.scheduleOpt.load()
s.scheduleOpt.store(&cfg)
if err := s.scheduleOpt.persist(s.kv); err != nil {
s.scheduleOpt.store(old)
log.Error("failed to update schedule config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
zap.Error(err))
return err
}
log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
Expand All @@ -557,6 +562,11 @@ func (s *Server) SetReplicationConfig(cfg ReplicationConfig) error {
old := s.scheduleOpt.rep.load()
s.scheduleOpt.rep.store(&cfg)
if err := s.scheduleOpt.persist(s.kv); err != nil {
s.scheduleOpt.rep.store(old)
log.Error("failed to update replication config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
zap.Error(err))
return err
}
log.Info("replication config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
Expand All @@ -568,6 +578,11 @@ func (s *Server) SetPDServerConfig(cfg PDServerConfig) error {
old := s.scheduleOpt.loadPDServerConfig()
s.scheduleOpt.pdServerConfig.Store(&cfg)
if err := s.scheduleOpt.persist(s.kv); err != nil {
s.scheduleOpt.pdServerConfig.Store(old)
log.Error("failed to update PDServer config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
zap.Error(err))
return err
}
log.Info("PD server config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
Expand Down Expand Up @@ -605,12 +620,23 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error {
old := s.scheduleOpt.ns[name].load()
n.store(&cfg)
if err := s.scheduleOpt.persist(s.kv); err != nil {
s.scheduleOpt.ns[name].store(old)
log.Error("failed to update namespace config",
zap.String("name", name),
zap.Reflect("new", cfg),
zap.Reflect("old", old),
zap.Error(err))
return err
}
log.Info("namespace config is updated", zap.String("name", name), zap.Reflect("new", cfg), zap.Reflect("old", old))
} else {
s.scheduleOpt.ns[name] = newNamespaceOption(&cfg)
if err := s.scheduleOpt.persist(s.kv); err != nil {
s.scheduleOpt.ns[name].store(&NamespaceConfig{})
log.Error("failed to add namespace config",
zap.String("name", name),
zap.Reflect("new", cfg),
zap.Error(err))
return err
}
log.Info("namespace config is added", zap.String("name", name), zap.Reflect("new", cfg))
Expand All @@ -622,8 +648,12 @@ func (s *Server) SetNamespaceConfig(name string, cfg NamespaceConfig) error {
func (s *Server) DeleteNamespaceConfig(name string) error {
if n, ok := s.scheduleOpt.ns[name]; ok {
cfg := n.load()
delete(s.scheduleOpt.ns, name)
s.scheduleOpt.ns[name].store(&NamespaceConfig{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use delete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete is not atomic, It may cause DATA RACE error.
I have met it in running test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this way may get error config? all default is zero.

Copy link
Contributor Author

@bradyjoestar bradyjoestar Apr 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/pingcap/pd/blob/9ba0e4453c3ae4581f2ea0d52c2f7803d57905ca/server/server.go#L578-L582

If ns[name] doesnt exist,it will return a &NamespaceConfig{} , so in a certain sense they are equivalent, I thought

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

if err := s.scheduleOpt.persist(s.kv); err != nil {
s.scheduleOpt.ns[name].store(cfg)
log.Error("failed to delete namespace config",
zap.String("name", name),
zap.Error(err))
return err
}
log.Info("namespace config is deleted", zap.String("name", name), zap.Reflect("config", *cfg))
Expand All @@ -636,6 +666,13 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error {
s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue)
err := s.scheduleOpt.persist(s.kv)
if err != nil {
s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue)
log.Error("failed to update label property config",
zap.String("typ", typ),
zap.String("labelKey", labelKey),
zap.String("labelValue", labelValue),
zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig()),
zap.Error(err))
return err
}
log.Info("label property config is updated", zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig()))
Expand All @@ -647,6 +684,13 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error {
s.scheduleOpt.DeleteLabelProperty(typ, labelKey, labelValue)
err := s.scheduleOpt.persist(s.kv)
if err != nil {
s.scheduleOpt.SetLabelProperty(typ, labelKey, labelValue)
log.Error("failed to delete label property config",
zap.String("typ", typ),
zap.String("labelKey", labelKey),
zap.String("labelValue", labelValue),
zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig()),
zap.Error(err))
return err
}
log.Info("label property config is deleted", zap.Reflect("config", s.scheduleOpt.loadLabelPropertyConfig()))
Expand All @@ -664,9 +708,15 @@ func (s *Server) SetClusterVersion(v string) error {
if err != nil {
return err
}
old := s.scheduleOpt.loadClusterVersion()
s.scheduleOpt.SetClusterVersion(*version)
err = s.scheduleOpt.persist(s.kv)
if err != nil {
s.scheduleOpt.SetClusterVersion(old)
log.Error("failed to update cluster version",
zap.String("old-version", old.String()),
zap.String("new-version", v),
zap.Error(err))
return err
}
log.Info("cluster version is updated", zap.String("new-version", v))
Expand Down
7 changes: 5 additions & 2 deletions server/systime_mon.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ func StartMonitor(now func() time.Time, systimeErrHandler func()) {
for {
last := now().UnixNano()
<-tick.C
if now().UnixNano() < last {
log.Error("system time jump backward", zap.Int64("last", last))
now := now().UnixNano()
if now < last {
log.Error("system time jump backward",
zap.Int64("last", last),
zap.Int64("now", now))
systimeErrHandler()
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/systime_mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
func TestSystimeMonitor(t *testing.T) {
var jumpForward int32

trigged := false
var trigged int32 = 0
go StartMonitor(
func() time.Time {
if !trigged {
trigged = true
if atomic.LoadInt32(&trigged) != 1 {
atomic.StoreInt32(&trigged, 1)
return time.Now()
}

Expand Down
4 changes: 2 additions & 2 deletions tests/server/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *serverTestSuite) TestRegionSyncer(c *C) {
c.Assert(err, IsNil)
}
// ensure flush to region kv
time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?

Copy link
Contributor Author

@bradyjoestar bradyjoestar Mar 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once I have met the following logs in jenkins

obtain value: 91
expect value:110

Maybe the time isnt enough? I guess. 😀 :

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt that this change can solve this problem actually. /cc @nolouch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, maybe relate to the ci environment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore to 3s.

err = leaderServer.Stop()
c.Assert(err, IsNil)
cluster.WaitLeader()
Expand Down Expand Up @@ -113,7 +113,7 @@ func (s *serverTestSuite) TestFullSyncWithAddMember(c *C) {
c.Assert(err, IsNil)
}
// ensure flush to region kv
time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)
// restart pd1
err = leaderServer.Stop()
c.Assert(err, IsNil)
Expand Down