Skip to content

Commit

Permalink
bugfix grpc ip direct connection status check (#2316)
Browse files Browse the repository at this point in the history
Update internal/net/grpc/pool/pool.go

Signed-off-by: kpango <kpango@vdaas.org>
  • Loading branch information
kpango committed Feb 5, 2024
1 parent 2a5261c commit adf1b61
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions internal/net/grpc/pool/pool.go
Expand Up @@ -284,7 +284,7 @@ func (p *pool) flush() {
}

func (p *pool) refreshConn(ctx context.Context, idx int, pc *poolConn, addr string) (err error) {
if pc != nil && pc.addr == addr && isHealthy(pc.conn) {
if pc != nil && pc.addr == addr && isHealthy(ctx, pc.conn) {
return nil
}
if pc != nil {
Expand All @@ -295,7 +295,7 @@ func (p *pool) refreshConn(ctx context.Context, idx int, pc *poolConn, addr stri
conn, err := p.dial(ctx, addr)
if err != nil {
if pc != nil {
if isHealthy(pc.conn) {
if isHealthy(ctx, pc.conn) {
log.Debugf("dialing new connection to %s failed,\terror: %v,\tbut existing connection to %s is healthy will keep existing connection", addr, err, pc.addr)
return nil
}
Expand Down Expand Up @@ -478,7 +478,7 @@ func (p *pool) dial(ctx context.Context, addr string) (conn *ClientConn, err err
log.Debugf("failed to dial gRPC connection to %s,\terror: %v", addr, err)
return nil, err
}
if !isHealthy(conn) {
if !isHealthy(ctx, conn) {
if conn != nil {
err = conn.Close()
if err != nil && !errors.Is(err, grpc.ErrClientConnClosing) {
Expand Down Expand Up @@ -514,7 +514,7 @@ func (p *pool) IsHealthy(ctx context.Context) (healthy bool) {
var cnt, unhealthy int
pl := p.len()
err := p.loop(ctx, func(ctx context.Context, idx int, pc *poolConn) bool {
if pc == nil || !isHealthy(pc.conn) {
if pc == nil || !isHealthy(ctx, pc.conn) {
if p.isIP {
if pc != nil && pc.addr != "" {
err := p.refreshConn(ctx, idx, pc, pc.addr)
Expand Down Expand Up @@ -603,11 +603,11 @@ func (p *pool) getHealthyConn(ctx context.Context, cnt, retry uint64) (*ClientCo
if pl > 0 {
idx = int(p.current.Add(1) % pl)
}
if pc := p.load(idx); pc != nil && isHealthy(pc.conn) {
if pc := p.load(idx); pc != nil && isHealthy(ctx, pc.conn) {
return pc.conn, true
}
conn, err := p.dial(ctx, p.addr)
if err == nil && conn != nil && isHealthy(conn) {
if err == nil && conn != nil && isHealthy(ctx, conn) {
p.store(idx, &poolConn{
conn: conn,
addr: p.addr,
Expand All @@ -619,7 +619,7 @@ func (p *pool) getHealthyConn(ctx context.Context, cnt, retry uint64) (*ClientCo
}

if pl > 0 {
if pc := p.load(int(p.current.Add(1) % pl)); pc != nil && isHealthy(pc.conn) {
if pc := p.load(int(p.current.Add(1) % pl)); pc != nil && isHealthy(ctx, pc.conn) {
return pc.conn, true
}
}
Expand Down Expand Up @@ -699,7 +699,7 @@ func (p *pool) scanGRPCPort(ctx context.Context) (port uint16, err error) {
net.JoinHostPort(p.host, port),
append(p.dopts, grpc.WithBlock())...)

if err == nil && isHealthy(conn) && conn.Close() == nil {
if err == nil && isHealthy(ctx, conn) && conn.Close() == nil {
// if no error and healthy the port is ready for gRPC
return port, nil
}
Expand Down Expand Up @@ -784,7 +784,7 @@ func (pc *poolConn) Close(ctx context.Context, delay time.Duration) error {
}
}

func isHealthy(conn *ClientConn) bool {
func isHealthy(ctx context.Context, conn *ClientConn) bool {
if conn == nil {
log.Warn("gRPC target connection is nil")
return false
Expand All @@ -797,7 +797,15 @@ func isHealthy(conn *ClientConn) bool {
log.Debugf("gRPC target %s's connection status will be Ready soon\tstatus: %s", conn.Target(), state.String())
return true
case connectivity.Idle:
log.Warnf("gRPC target %s's connection status is waiting for target\tstatus: %s", conn.Target(), state.String())
log.Debugf("gRPC target %s's connection status is waiting for target\tstatus: %s", conn.Target(), state.String())
conn.Connect()
if conn.WaitForStateChange(ctx, state) {
state = conn.GetState()
if state == connectivity.Ready || state == connectivity.Connecting {
log.Debugf("gRPC target %s's connection status enabled for target\tstatus: %s", conn.Target(), state.String())
return true
}
}
return false
case connectivity.Shutdown, connectivity.TransientFailure:
log.Errorf("gRPC target %s's connection status is unhealthy\tstatus: %s", conn.Target(), state.String())
Expand Down

0 comments on commit adf1b61

Please sign in to comment.