Skip to content

Commit

Permalink
fixed register issue (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
xwi88 committed Mar 28, 2022
1 parent 44bb398 commit ad71635
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 54 deletions.
3 changes: 2 additions & 1 deletion internal/discovering/etcd/discover_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,13 @@ func (r *discover) initHandlerAndLogger() {
r.config.Return.Errors = false
}

if r.config.Return.Messages && r.config.MessagesHandler != nil {
if r.config.Return.Messages && r.config.MessagesHandler != nil || r.config.ReturnResolve && r.config.AddressesParser != nil {
go func() {
r.config.MessagesHandler(r.messages)
}()
} else {
r.config.Return.Messages = false
r.config.ReturnResolve = false
}

if (!r.config.Return.Messages || !r.config.Return.Errors) && r.config.Logger == nil {
Expand Down
114 changes: 61 additions & 53 deletions internal/registering/etcd/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (r *register) Run() error {

err := r.register()
if err != nil {
panic(fmt.Errorf("[%s] register name:%s, key:%s, err: %w", moduleName, r.config.Name, r.uniKey, err))
return fmt.Errorf("[%s] register name:%s, key:%s, err: %w", moduleName, r.config.Name, r.uniKey, err)
}

r.start = time.Now()
Expand Down Expand Up @@ -376,69 +376,77 @@ func (r *register) keepAliveOnce(ctx context.Context, key string) error {
return nil
}

// keepAlive start and keepAlive, keep the init val. If you want dynamic update val, replace with keepAliveOnce.
func (r *register) keepAlive(ctx context.Context, key string) (err error) {
aliveDeal := func() error {
reps, err := r.client.KeepAlive(ctx, r.leaseID)
if err != nil {
r.handlerError(fmt.Errorf("[%s] keepAlive name:%s, key:%v, leaseID:%v, err:%w", moduleName, r.config.Name, key, r.leaseID, err))
return err
}
for rsp := range reps {
if rsp == nil {
continue
return r.keepAliveLoop(ctx, key)
}

// keepAlive start and keepAlive, keep the init val. If you want dynamic update val, replace with keepAliveOnce.
func (r *register) keepAliveLoop(ctx context.Context, key string) (err error) {
go func() {
aliveDeal := func() error {
reps, err := r.client.KeepAlive(ctx, r.leaseID)
if err != nil {
r.handlerError(fmt.Errorf("[%s] keepAlive name:%s, key:%v, leaseID:%v, err:%w", moduleName, r.config.Name, key, r.leaseID, err))
return err
}
r.handlerMsg(fmt.Sprintf("[%s] keepAlive name:%s, key:%s, leaseID:%v, startupTime:%s, uptime:%v, revision:%v, raft_term:%v",
moduleName, r.config.Name, key, r.leaseID, r.startupTime(), r.uptime(), rsp.GetRevision(), rsp.GetRaftTerm()))
for rsp := range reps {
if rsp == nil {
continue
}
r.handlerMsg(fmt.Sprintf("[%s] keepAlive name:%s, key:%s, leaseID:%v, startupTime:%s, uptime:%v, revision:%v, raft_term:%v",
moduleName, r.config.Name, key, r.leaseID, r.startupTime(), r.uptime(), rsp.GetRevision(), rsp.GetRaftTerm()))
}
return nil
}
return nil
}

// only first KeepAlive error will return
if err = aliveDeal(); err != nil {
return err
}

loop:
for {
r.latestLoopTryTime = time.Now()
r.curLoopTry.Add(1)
loopCount := r.curLoopTry.Load()
if loopCount > r.maxLoopTry {
r.handlerError(fmt.Errorf("[%s] keepAlive name:%s, key:%v, leaseID:%v, break and exited, over max loopCount:%v",
moduleName, r.config.Name, key, r.leaseID, r.maxLoopTry))
break loop
// only first KeepAlive error will return
if err = aliveDeal(); err != nil {
goto out
}

r.handlerMsg(fmt.Sprintf("[%s] keepAlive enter loop:%v, maxLoopTry:%v, name:%s, key:%s, err:%v", moduleName, loopCount, r.maxLoopTry, r.config.Name, key, err))
loop:
for {
r.latestLoopTryTime = time.Now()
r.curLoopTry.Add(1)
loopCount := r.curLoopTry.Load()
if loopCount > r.maxLoopTry {
r.handlerError(fmt.Errorf("[%s] keepAlive name:%s, key:%v, leaseID:%v, break and exited, over max loopCount:%v",
moduleName, r.config.Name, key, r.leaseID, r.maxLoopTry))
break loop
}

select {
case <-r.closed:
rsp, err := r.client.Delete(context.Background(), key)
r.handlerMsg(fmt.Sprintf("[%s] keepAlive close and delete name:%s, key:%s, err:%v, response:%v", moduleName, r.config.Name, key, err, rsp))
break loop
default:
if r.client == nil {
err = r.reConnectEtcd()
r.handlerMsg(fmt.Sprintf("[%s] keepAlive reConnectEtcd curLoopTry:%v, name:%s, key:%s, err:%v", moduleName, r.curLoopTry, r.config.Name, key, err))
}
}

err = aliveDeal()
if err != nil {
r.handlerError(fmt.Errorf("[%s] keepAlive name:%s, key:%v, leaseID:%v, err:%w", moduleName, r.config.Name, key, r.leaseID, err))
}
r.handlerMsg(fmt.Sprintf("[%s] keepAlive enter loop:%v, maxLoopTry:%v, name:%s, key:%s, err:%v", moduleName, loopCount, r.maxLoopTry, r.config.Name, key, err))

select {
case <-r.closed:
rsp, err := r.client.Delete(context.Background(), key)
r.handlerMsg(fmt.Sprintf("[%s] keepAlive close and delete name:%s, key:%s, err:%v, response:%v", moduleName, r.config.Name, key, err, rsp))
break loop
default:
}
// reset after long time, no loop try
latestNoLoopDeltaTime := time.Now().Sub(r.latestLoopTryTime)
if latestNoLoopDeltaTime > r.deltaResetTimeDuration {
r.curLoopTry.Store(0)
r.handlerMsg(fmt.Sprintf("[%s] keepAlive reset curLoopTry name:%s, key:%s, latestNoLoopDeltaTime:%v more than r.deltaResetTimeDuration:%v",
moduleName, r.config.Name, key, latestNoLoopDeltaTime, r.deltaResetTimeDuration))
}
err = aliveDeal()
if err != nil {
r.handlerError(fmt.Errorf("[%s] keepAlive name:%s, key:%v, leaseID:%v, err:%w", moduleName, r.config.Name, key, r.leaseID, err))
}

if r.client == nil {
err = r.reConnectEtcd()
r.handlerMsg(fmt.Sprintf("[%s] keepAlive reConnectEtcd curLoopTry:%v, name:%s, key:%s, err:%v", moduleName, r.curLoopTry, r.config.Name, key, err))
// reset after long time, no loop try
latestNoLoopDeltaTime := time.Now().Sub(r.latestLoopTryTime)
if latestNoLoopDeltaTime > r.deltaResetTimeDuration {
r.curLoopTry.Store(0)
r.handlerMsg(fmt.Sprintf("[%s] keepAlive reset curLoopTry name:%s, key:%s, latestNoLoopDeltaTime:%v more than r.deltaResetTimeDuration:%v",
moduleName, r.config.Name, key, latestNoLoopDeltaTime, r.deltaResetTimeDuration))
}
time.Sleep(r.config.KeepAlive.Interval*2/3 + 1)
}
out:
r.handlerMsg(fmt.Sprintf("[%s] keepAlive break loop and exit name:%s, key:%s, err:%v", moduleName, r.config.Name, key, err))
}()

}
r.handlerMsg(fmt.Sprintf("[%s] keepAlive break loop and exit name:%s, key:%s, err:%v", moduleName, r.config.Name, key, err))
return err
}

Expand Down

0 comments on commit ad71635

Please sign in to comment.