Skip to content

Commit

Permalink
Improve etcd sd graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Hahn committed Mar 12, 2019
1 parent 7761bf6 commit 6fffaf8
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 14 deletions.
23 changes: 12 additions & 11 deletions cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type etcdServiceDiscovery struct {
grantLeaseTimeout time.Duration
grantLeaseMaxRetries int
grantLeaseInterval time.Duration
shutdownDelay time.Duration
appDieChan chan bool
}

Expand Down Expand Up @@ -98,6 +99,7 @@ func (sd *etcdServiceDiscovery) configure() {
sd.grantLeaseTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.timeout")
sd.grantLeaseMaxRetries = sd.config.GetInt("pitaya.cluster.sd.etcd.grantlease.maxretries")
sd.grantLeaseInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.retryinterval")
sd.shutdownDelay = sd.config.GetDuration("pitaya.cluster.sd.etcd.shutdown.delay")
}

func (sd *etcdServiceDiscovery) watchLeaseChan(c <-chan *clientv3.LeaseKeepAliveResponse) {
Expand Down Expand Up @@ -196,9 +198,7 @@ func (sd *etcdServiceDiscovery) addServerIntoEtcd(server *Server) error {
}

func (sd *etcdServiceDiscovery) bootstrapServer(server *Server) error {
// put key
err := sd.addServerIntoEtcd(server)
if err != nil {
if err := sd.addServerIntoEtcd(server); err != nil {
return err
}

Expand All @@ -216,10 +216,6 @@ func (sd *etcdServiceDiscovery) AddListener(listener SDListener) {
func (sd *etcdServiceDiscovery) AfterInit() {
}

// BeforeShutdown executes before shutting down
func (sd *etcdServiceDiscovery) BeforeShutdown() {
}

func (sd *etcdServiceDiscovery) notifyListeners(act Action, sv *Server) {
for _, l := range sd.listeners {
if act == DEL {
Expand Down Expand Up @@ -382,7 +378,6 @@ func (sd *etcdServiceDiscovery) printServers() {
logger.Log.Debugf("type: %s, servers: %s", k, v)
return true
})

}

// SyncServers gets all servers from etcd
Expand Down Expand Up @@ -425,11 +420,17 @@ func (sd *etcdServiceDiscovery) SyncServers() error {
return nil
}

// BeforeShutdown executes before shutting down and will remove the server from the list
func (sd *etcdServiceDiscovery) BeforeShutdown() {
sd.revoke()
time.Sleep(sd.shutdownDelay) // Sleep for a short while to ensure shutdown has propagated
}

// Shutdown executes on shutdown and will clean etcd
func (sd *etcdServiceDiscovery) Shutdown() error {
sd.running = false
close(sd.stopChan)
return sd.revoke()
return nil
}

// revoke prevents Pitaya from crashing when etcd is not available
Expand Down Expand Up @@ -478,7 +479,7 @@ func (sd *etcdServiceDiscovery) watchEtcdChanges() {
var sv *Server
var err error
if sv, err = parseServer(ev.Kv.Value); err != nil {
logger.Log.Error(err)
logger.Log.Errorf("Failed to parse server from etcd: %v", err)
continue
}
sd.addServer(sv)
Expand All @@ -487,7 +488,7 @@ func (sd *etcdServiceDiscovery) watchEtcdChanges() {
case clientv3.EventTypeDelete:
_, svID, err := parseEtcdKey(string(ev.Kv.Key))
if err != nil {
logger.Log.Warn("failed to parse key from etcd: %s", ev.Kv.Key)
logger.Log.Warnf("failed to parse key from etcd: %s", ev.Kv.Key)
continue
}
sd.deleteServer(svID)
Expand Down
20 changes: 18 additions & 2 deletions cluster/etcd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,24 @@ func TestEtcdSDInit(t *testing.T) {
}
}

func TestEtcdBeforeShutdown(t *testing.T) {
t.Parallel()
for _, table := range etcdSDTables {
t.Run(table.server.ID, func(t *testing.T) {
config := getConfig()
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, cli)
e.Init()
assert.True(t, e.running)
e.BeforeShutdown()
assert.True(t, e.running)
_, err := cli.Revoke(context.TODO(), e.leaseID)
assert.Error(t, err)
})
}
}

func TestEtcdShutdown(t *testing.T) {
t.Parallel()
for _, table := range etcdSDTables {
Expand All @@ -272,8 +290,6 @@ func TestEtcdShutdown(t *testing.T) {
assert.True(t, e.running)
e.Shutdown()
assert.False(t, e.running)
_, err := cli.Revoke(context.TODO(), e.leaseID)
assert.Error(t, err)
})
}
}
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (c *Config) fillDefaultValues() {
"pitaya.cluster.sd.etcd.prefix": "pitaya/",
"pitaya.cluster.sd.etcd.revoke.timeout": "5s",
"pitaya.cluster.sd.etcd.syncservers.interval": "120s",
"pitaya.cluster.sd.etcd.shutdown.delay": "10ms",
// the sum of this config among all the frontend servers should always be less than
// the sum of pitaya.buffer.cluster.rpc.server.nats.messages, for covering the worst case scenario
// a single backend server should have the config pitaya.buffer.cluster.rpc.server.nats.messages bigger
Expand Down
6 changes: 5 additions & 1 deletion docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@ These configuration values configure service discovery for the default etcd serv
- Prefix used to avoid collisions with different pitaya applications, servers must have the same prefix to be able to see each other
* - pitaya.cluster.sd.etcd.syncservers.interval
- 120s
- time.Time
- time.Duration
- Interval between server syncs performed by the service discovery module
* - pitaya.cluster.sd.etcd.shutdown.delay
- 10ms
- time.Duration
- Time to wait to shutdown after deregistering from service discovery

RPC Service
===========
Expand Down

0 comments on commit 6fffaf8

Please sign in to comment.