-
Notifications
You must be signed in to change notification settings - Fork 9
/
connection.go
86 lines (77 loc) · 2.78 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*
Copyright (c) 2022-2023 Nordix Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package connection
import (
"context"
"fmt"
"time"
"github.com/nordix/meridio/pkg/health"
"github.com/nordix/meridio/pkg/log"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
type gRPCConnectionStateMonitor struct {
healthService string
*grpc.ClientConn
}
// Monitor -
// Monitors connection state changes and updates respective health service through context.
// Currently only gRPC connection is supported, which relies on EXPERIMENTAL API.
func Monitor(ctx context.Context, healthService string, cc interface{}) error {
switch cc := cc.(type) {
case *grpc.ClientConn:
m := gRPCConnectionStateMonitor{
healthService: healthService,
ClientConn: cc,
}
go func() {
defer log.Logger.V(1).Info("Connection monitor exit", "service", m.healthService)
for {
s := m.GetState()
health.SetServingStatus(ctx, m.healthService, s == connectivity.Ready)
log.Logger.V(2).Info("Connection", "service", m.healthService, "state", s, "target", m.Target())
// Note: gRPC will NOT establish underlying transport connection except for the
// initial "dial" or unless the user tries to send sg and there's no backing connection.
// Therefore trigger transport connect if gRPC connection state is Idle for 3 seconds,
// to avoid the health service from getting stuck in NOT_SERVING if the user "remains silent"
// for too long.
// The gRPC connection status might transition to IDLE from READY due to IDLE_TIMEOUT or
// receiving GOAWAY while there are no pending RPCs, and from CONNECTING due to IDLE_TIMEOUT.
// So, even with the below forced connect, we might still report service unavailability because
// of the protocol.
waitCtx := ctx
if s == connectivity.Idle {
// TODO: configurable timeout
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
waitCtx = timeoutCtx
defer cancel()
}
// Block until connection state changes
if !m.WaitForStateChange(waitCtx, s) {
// context got timeout or canceled
select {
case <-ctx.Done():
// main context done
return
default:
// timeout; try re-connect
m.Connect()
}
}
}
}()
return nil
default:
return fmt.Errorf("unknown connection")
}
}