Skip to content

Commit

Permalink
Made clear when etcd was booting and when it was just trying to recon…
Browse files Browse the repository at this point in the history
…nect
  • Loading branch information
gabrielerzinger committed Aug 8, 2018
1 parent 83a8886 commit 72e6c86
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 71 deletions.
157 changes: 94 additions & 63 deletions cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,28 @@ import (
)

type etcdServiceDiscovery struct {
cli *clientv3.Client
config *config.Config
syncServersInterval time.Duration
heartbeatTTL time.Duration
logHeartbeat bool
lastHeartbeatTime time.Time
leaseID clientv3.LeaseID
serverMapByType sync.Map
serverMapByID sync.Map
etcdEndpoints []string
etcdPrefix string
etcdDialTimeout time.Duration
running bool
server *Server
stopChan chan bool
lastSyncTime time.Time
listeners []SDListener
revokeTimeout time.Duration
bootstrapTimeout time.Duration
appDieChan chan bool
cli *clientv3.Client
config *config.Config
syncServersInterval time.Duration
heartbeatTTL time.Duration
logHeartbeat bool
lastHeartbeatTime time.Time
leaseID clientv3.LeaseID
serverMapByType sync.Map
serverMapByID sync.Map
etcdEndpoints []string
etcdPrefix string
etcdDialTimeout time.Duration
running bool
server *Server
stopChan chan bool
lastSyncTime time.Time
listeners []SDListener
revokeTimeout time.Duration
grantLeaseTimeout time.Duration
grantLeaseMaxRetries int
grantLeaseInterval time.Duration
appDieChan chan bool
}

// NewEtcdServiceDiscovery ctor
Expand All @@ -71,13 +73,13 @@ func NewEtcdServiceDiscovery(
client = cli[0]
}
sd := &etcdServiceDiscovery{
config: config,
running: false,
server: server,
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
config: config,
running: false,
server: server,
listeners: make([]SDListener, 0),
stopChan: make(chan bool),
appDieChan: appDieChan,
cli: client,
cli: client,
}

sd.configure()
Expand All @@ -93,37 +95,77 @@ func (sd *etcdServiceDiscovery) configure() {
sd.logHeartbeat = sd.config.GetBool("pitaya.cluster.sd.etcd.heartbeat.log")
sd.syncServersInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.syncservers.interval")
sd.revokeTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.revoke.timeout")
sd.bootstrapTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.bootstrap.timeout")
sd.grantLeaseTimeout = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.timeout")
sd.grantLeaseMaxRetries = sd.config.GetInt("pitaya.cluster.sd.etcd.grantlease.maxretries")
sd.grantLeaseInterval = sd.config.GetDuration("pitaya.cluster.sd.etcd.grantlease.retryinterval")
}

func (sd *etcdServiceDiscovery) watchLeaseChan(c <-chan *clientv3.LeaseKeepAliveResponse) {
failedGrantLeaseAttempts := 0
for {
select {
case <-sd.stopChan:
return
case kaRes := <-c:
if kaRes == nil {
logger.Log.Warn("sd: error renewing etcd lease, rebootstrapping")
for {
err := sd.bootstrap()
if err != nil {
logger.Log.Warn("sd: error rebootstrapping lease, will retry in 5 seconds")
time.Sleep(5 * time.Second)
continue
} else {
case leaseKeepAliveResponse := <-c:
if leaseKeepAliveResponse != nil {
if sd.logHeartbeat {
logger.Log.Debugf("sd: etcd lease %x renewed", leaseKeepAliveResponse.ID)
}
failedGrantLeaseAttempts = 0
continue
}
logger.Log.Warn("sd: error renewing etcd lease, reconfiguring")
for {
err := sd.renewLease()
if err != nil {
failedGrantLeaseAttempts = failedGrantLeaseAttempts + 1
if err == constants.ErrEtcdGrantLeaseTimeout {
logger.Log.Warn("sd: timed out trying to grant etcd lease")
if sd.appDieChan != nil {
sd.appDieChan <- true
}
return
}
if failedGrantLeaseAttempts >= sd.grantLeaseMaxRetries {
logger.Log.Warn("sd: exceeded max attempts to renew etcd lease")
if sd.appDieChan != nil {
sd.appDieChan <- true
}
return
}
logger.Log.Warnf("sd: error granting etcd lease, will retry in %d seconds", uint64(sd.grantLeaseInterval.Seconds()))
time.Sleep(sd.grantLeaseInterval)
continue
}
} else {
if sd.logHeartbeat {
logger.Log.Debugf("sd: etcd lease %x renewed", kaRes.ID)
}
return
}
}
}
}

func (sd *etcdServiceDiscovery) bootstrapLease() error {
// renewLease restablishes connection with etcd
func (sd *etcdServiceDiscovery) renewLease() error {
c := make(chan error)
go func() {
defer close(c)
logger.Log.Infof("waiting for etcd lease")
err := sd.grantLease()
if err != nil {
c <- err
return
}
err = sd.bootstrapServer(sd.server)
c <- err
}()
select {
case err := <-c:
return err
case <-time.After(sd.grantLeaseTimeout):
return constants.ErrEtcdGrantLeaseTimeout
}
}

func (sd *etcdServiceDiscovery) grantLease() error {
// grab lease
l, err := sd.cli.Grant(context.TODO(), int64(sd.heartbeatTTL.Seconds()))
if err != nil {
Expand Down Expand Up @@ -239,28 +281,17 @@ func (sd *etcdServiceDiscovery) GetServersByType(serverType string) (map[string]
}

func (sd *etcdServiceDiscovery) bootstrap() error {
c := make(chan error)
defer close(c)
go func() {
logger.Log.Infof("waiting for etcd connection")
err := sd.bootstrapLease()
if err != nil {
c <- err
return
}
err = sd.bootstrapServer(sd.server)
c <- err
}()
select {
case err := <-c:
err := sd.grantLease()
if err != nil {
return err
case <-time.After(sd.bootstrapTimeout):
logger.Log.Warn("timed out waiting for etcd connection")
if sd.appDieChan != nil {
sd.appDieChan <- true
}
return nil
}

err = sd.bootstrapServer(sd.server)
if err != nil {
return err
}

return nil
}

// GetServer returns a server given it's id
Expand Down Expand Up @@ -392,7 +423,7 @@ func (sd *etcdServiceDiscovery) Shutdown() error {
}

// revoke prevents Pitaya from crashing when etcd is not available
func (sd *etcdServiceDiscovery) revoke() error {
func (sd *etcdServiceDiscovery) revoke() error {
c := make(chan error)
defer close(c)
go func() {
Expand Down
10 changes: 5 additions & 5 deletions cluster/etcd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestEtcdSDBootstrapLease(t *testing.T) {
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, cli)
err := e.bootstrapLease()
err := e.grantLease()
assert.NoError(t, err)
assert.NotEmpty(t, e.leaseID)
})
Expand All @@ -91,7 +91,7 @@ func TestEtcdSDBootstrapLeaseError(t *testing.T) {
c, cli := helpers.GetTestEtcd(t)
c.Terminate(t)
e := getEtcdSD(t, config, table.server, cli)
err := e.bootstrapLease()
err := e.grantLease()
assert.Error(t, err)
})
}
Expand All @@ -105,7 +105,7 @@ func TestEtcdSDBootstrapServer(t *testing.T) {
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, cli)
e.bootstrapLease()
e.grantLease()
err := e.bootstrapServer(table.server)
assert.NoError(t, err)
v, err := cli.Get(context.TODO(), getKey(table.server.ID, table.server.Type))
Expand All @@ -130,7 +130,7 @@ func TestEtcdSDDeleteServer(t *testing.T) {
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, cli)
e.bootstrapLease()
e.grantLease()
err := e.bootstrapServer(table.server)
assert.NoError(t, err)
e.deleteServer(table.server.ID)
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestEtcdSDGetServer(t *testing.T) {
c, cli := helpers.GetTestEtcd(t)
defer c.Terminate(t)
e := getEtcdSD(t, config, table.server, cli)
e.bootstrapLease()
e.grantLease()
e.bootstrapServer(table.server)
sv, err := e.GetServer(table.server.ID)
assert.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ func (c *Config) fillDefaultValues() {
"pitaya.cluster.sd.etcd.endpoints": "localhost:2379",
"pitaya.cluster.sd.etcd.prefix": "pitaya/",
"pitaya.cluster.sd.etcd.heartbeat.ttl": "60s",
"pitaya.cluster.sd.etcd.bootstrap.timeout": "60s",
"pitaya.cluster.sd.etcd.grantlease.timeout": "60s",
"pitaya.cluster.sd.etcd.grantlease.maxretries": 15,
"pitaya.cluster.sd.etcd.grantlease.retryinterval": "5s",
"pitaya.cluster.sd.etcd.revoke.timeout": "5s",
"pitaya.cluster.sd.etcd.heartbeat.log": false,
"pitaya.cluster.sd.etcd.syncservers.interval": "120s",
Expand Down
1 change: 1 addition & 0 deletions constants/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
ErrCloseClosedGroup = errors.New("close closed group")
ErrCloseClosedSession = errors.New("close closed session")
ErrClosedGroup = errors.New("group closed")
ErrEtcdGrantLeaseTimeout = errors.New("timed out waiting for etcd lease grant")
ErrFrontSessionCantPushToFront = errors.New("frontend session can't push to front")
ErrIllegalUID = errors.New("illegal uid")
ErrMemberNotFound = errors.New("member not found in the group")
Expand Down
12 changes: 10 additions & 2 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ These configuration values configure service discovery for the default etcd serv
- 60s
- time.Time
- Hearbeat interval for the etcd lease
* - pitaya.cluster.sd.etcd.bootstrap.timeout
* - pitaya.cluster.sd.etcd.grantlease.timeout
- 60s
- time.Duration
- Connection timeout when waiting for etcd to become available
- Timeout for etcd lease
* - pitaya.cluster.sd.etcd.grantlease.maxretries
- 15
- int
- Maximum number of attempts to etcd grant lease
* - pitaya.cluster.sd.etcd.grantlease.retryinterval
- 5s
- time.Duration
- Interval between each grant lease attempt
* - pitaya.cluster.sd.etcd.revoke.timeout
- 5s
- time.Duration
Expand Down

0 comments on commit 72e6c86

Please sign in to comment.