diff --git a/cluster/etcd_service_discovery.go b/cluster/etcd_service_discovery.go index 893cc277..1b51ed9b 100644 --- a/cluster/etcd_service_discovery.go +++ b/cluster/etcd_service_discovery.go @@ -37,26 +37,28 @@ import ( ) type etcdServiceDiscovery struct { - cli *clientv3.Client - config *config.Config - syncServersInterval time.Duration - heartbeatTTL time.Duration - logHeartbeat bool - lastHeartbeatTime time.Time - leaseID clientv3.LeaseID - serverMapByType sync.Map - serverMapByID sync.Map - etcdEndpoints []string - etcdPrefix string - etcdDialTimeout time.Duration - running bool - server *Server - stopChan chan bool - lastSyncTime time.Time - listeners []SDListener - revokeTimeout time.Duration - bootstrapTimeout time.Duration - appDieChan chan bool + cli *clientv3.Client + config *config.Config + syncServersInterval time.Duration + heartbeatTTL time.Duration + logHeartbeat bool + lastHeartbeatTime time.Time + leaseID clientv3.LeaseID + serverMapByType sync.Map + serverMapByID sync.Map + etcdEndpoints []string + etcdPrefix string + etcdDialTimeout time.Duration + running bool + server *Server + stopChan chan bool + lastSyncTime time.Time + listeners []SDListener + revokeTimeout time.Duration + grantLeaseTimeout time.Duration + grantLeaseMaxRetries int + grantLeaseInterval time.Duration + appDieChan chan bool } // NewEtcdServiceDiscovery ctor @@ -71,13 +73,13 @@ func NewEtcdServiceDiscovery( client = cli[0] } sd := &etcdServiceDiscovery{ - config: config, - running: false, - server: server, - listeners: make([]SDListener, 0), - stopChan: make(chan bool), + config: config, + running: false, + server: server, + listeners: make([]SDListener, 0), + stopChan: make(chan bool), appDieChan: appDieChan, - cli: client, + cli: client, } sd.configure() @@ -93,37 +95,77 @@ func (sd *etcdServiceDiscovery) configure() { sd.logHeartbeat = sd.config.GetBool("pitaya.cluster.sd.etcd.heartbeat.log") sd.syncServersInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.syncservers.interval") sd.revokeTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.revoke.timeout") - sd.bootstrapTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.bootstrap.timeout") + sd.grantLeaseTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.timeout") + sd.grantLeaseMaxRetries = sd.config.GetInt("pitaya.cluster.sd.etcd.grantlease.maxretries") + sd.grantLeaseInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.retryinterval") } func (sd *etcdServiceDiscovery) watchLeaseChan(c <-chan *clientv3.LeaseKeepAliveResponse) { + failedGrantLeaseAttempts := 0 for { select { case <-sd.stopChan: return - case kaRes := <-c: - if kaRes == nil { - logger.Log.Warn("sd: error renewing etcd lease, rebootstrapping") - for { - err := sd.bootstrap() - if err != nil { - logger.Log.Warn("sd: error rebootstrapping lease, will retry in 5 seconds") - time.Sleep(5 * time.Second) - continue - } else { + case leaseKeepAliveResponse := <-c: + if leaseKeepAliveResponse != nil { + if sd.logHeartbeat { + logger.Log.Debugf("sd: etcd lease %x renewed", leaseKeepAliveResponse.ID) + } + failedGrantLeaseAttempts = 0 + continue + } + logger.Log.Warn("sd: error renewing etcd lease, reconfiguring") + for { + err := sd.renewLease() + if err != nil { + failedGrantLeaseAttempts = failedGrantLeaseAttempts + 1 + if err == constants.ErrEtcdGrantLeaseTimeout { + logger.Log.Warn("sd: timed out trying to grant etcd lease") + if sd.appDieChan != nil { + sd.appDieChan <- true + } return } + if failedGrantLeaseAttempts >= sd.grantLeaseMaxRetries { + logger.Log.Warn("sd: exceeded max attempts to renew etcd lease") + if sd.appDieChan != nil { + sd.appDieChan <- true + } + return + } + logger.Log.Warnf("sd: error granting etcd lease, will retry in %d seconds", uint64(sd.grantLeaseInterval.Seconds())) + time.Sleep(sd.grantLeaseInterval) + continue } - } else { - if sd.logHeartbeat { - logger.Log.Debugf("sd: etcd lease %x renewed", kaRes.ID) - } + return } } } } -func (sd *etcdServiceDiscovery) bootstrapLease() error { +// renewLease restablishes connection with etcd +func (sd *etcdServiceDiscovery) renewLease() error { + c := make(chan error) + go func() { + defer close(c) + logger.Log.Infof("waiting for etcd lease") + err := sd.grantLease() + if err != nil { + c <- err + return + } + err = sd.bootstrapServer(sd.server) + c <- err + }() + select { + case err := <-c: + return err + case <-time.After(sd.grantLeaseTimeout): + return constants.ErrEtcdGrantLeaseTimeout + } +} + +func (sd *etcdServiceDiscovery) grantLease() error { // grab lease l, err := sd.cli.Grant(context.TODO(), int64(sd.heartbeatTTL.Seconds())) if err != nil { @@ -239,28 +281,17 @@ func (sd *etcdServiceDiscovery) GetServersByType(serverType string) (map[string] } func (sd *etcdServiceDiscovery) bootstrap() error { - c := make(chan error) - defer close(c) - go func() { - logger.Log.Infof("waiting for etcd connection") - err := sd.bootstrapLease() - if err != nil { - c <- err - return - } - err = sd.bootstrapServer(sd.server) - c <- err - }() - select { - case err := <-c: + err := sd.grantLease() + if err != nil { return err - case <-time.After(sd.bootstrapTimeout): - logger.Log.Warn("timed out waiting for etcd connection") - if sd.appDieChan != nil { - sd.appDieChan <- true - } - return nil } + + err = sd.bootstrapServer(sd.server) + if err != nil { + return err + } + + return nil } // GetServer returns a server given it's id @@ -392,7 +423,7 @@ func (sd *etcdServiceDiscovery) Shutdown() error { } // revoke prevents Pitaya from crashing when etcd is not available -func (sd *etcdServiceDiscovery) revoke() error { +func (sd *etcdServiceDiscovery) revoke() error { c := make(chan error) defer close(c) go func() { diff --git a/cluster/etcd_service_discovery_test.go b/cluster/etcd_service_discovery_test.go index 7bf0c918..dcb199b1 100644 --- a/cluster/etcd_service_discovery_test.go +++ b/cluster/etcd_service_discovery_test.go @@ -76,7 +76,7 @@ func TestEtcdSDBootstrapLease(t *testing.T) { c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) e := getEtcdSD(t, config, table.server, cli) - err := e.bootstrapLease() + err := e.grantLease() assert.NoError(t, err) assert.NotEmpty(t, e.leaseID) }) @@ -91,7 +91,7 @@ func TestEtcdSDBootstrapLeaseError(t *testing.T) { c, cli := helpers.GetTestEtcd(t) c.Terminate(t) e := getEtcdSD(t, config, table.server, cli) - err := e.bootstrapLease() + err := e.grantLease() assert.Error(t, err) }) } @@ -105,7 +105,7 @@ func TestEtcdSDBootstrapServer(t *testing.T) { c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) e := getEtcdSD(t, config, table.server, cli) - e.bootstrapLease() + e.grantLease() err := e.bootstrapServer(table.server) assert.NoError(t, err) v, err := cli.Get(context.TODO(), getKey(table.server.ID, table.server.Type)) @@ -130,7 +130,7 @@ func TestEtcdSDDeleteServer(t *testing.T) { c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) e := getEtcdSD(t, config, table.server, cli) - e.bootstrapLease() + e.grantLease() err := e.bootstrapServer(table.server) assert.NoError(t, err) e.deleteServer(table.server.ID) @@ -190,7 +190,7 @@ func TestEtcdSDGetServer(t *testing.T) { c, cli := helpers.GetTestEtcd(t) defer c.Terminate(t) e := getEtcdSD(t, config, table.server, cli) - e.bootstrapLease() + e.grantLease() e.bootstrapServer(table.server) sv, err := e.GetServer(table.server.ID) assert.NoError(t, err) diff --git a/config/config.go b/config/config.go index e3cc837e..052092a8 100644 --- a/config/config.go +++ b/config/config.go @@ -74,7 +74,9 @@ func (c *Config) fillDefaultValues() { "pitaya.cluster.sd.etcd.endpoints": "localhost:2379", "pitaya.cluster.sd.etcd.prefix": "pitaya/", "pitaya.cluster.sd.etcd.heartbeat.ttl": "60s", - "pitaya.cluster.sd.etcd.bootstrap.timeout": "60s", + "pitaya.cluster.sd.etcd.grantlease.timeout": "60s", + "pitaya.cluster.sd.etcd.grantlease.maxretries": 15, + "pitaya.cluster.sd.etcd.grantlease.retryinterval": "5s", "pitaya.cluster.sd.etcd.revoke.timeout": "5s", "pitaya.cluster.sd.etcd.heartbeat.log": false, "pitaya.cluster.sd.etcd.syncservers.interval": "120s", diff --git a/constants/errors.go b/constants/errors.go index 7eec98bf..c6e7f7d0 100644 --- a/constants/errors.go +++ b/constants/errors.go @@ -35,6 +35,7 @@ var ( ErrCloseClosedGroup = errors.New("close closed group") ErrCloseClosedSession = errors.New("close closed session") ErrClosedGroup = errors.New("group closed") + ErrEtcdGrantLeaseTimeout = errors.New("timed out waiting for etcd lease grant") ErrFrontSessionCantPushToFront = errors.New("frontend session can't push to front") ErrIllegalUID = errors.New("illegal uid") ErrMemberNotFound = errors.New("member not found in the group") diff --git a/docs/configuration.rst b/docs/configuration.rst index 8fa899e8..18c41a00 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -31,10 +31,18 @@ These configuration values configure service discovery for the default etcd serv - 60s - time.Time - Hearbeat interval for the etcd lease - * - pitaya.cluster.sd.etcd.bootstrap.timeout + * - pitaya.cluster.sd.etcd.grantlease.timeout - 60s - time.Duration - - Connection timeout when waiting for etcd to become available + - Timeout for etcd lease + * - pitaya.cluster.sd.etcd.grantlease.maxretries + - 15 + - int + - Maximum number of attempts to etcd grant lease + * - pitaya.cluster.sd.etcd.grantlease.retryinterval + - 5s + - time.Duration + - Interval between each grant lease attempt * - pitaya.cluster.sd.etcd.revoke.timeout - 5s - time.Duration