-
Notifications
You must be signed in to change notification settings - Fork 2
/
stub.go
122 lines (102 loc) · 2.83 KB
/
stub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package stub
import (
"context"
"log"
"time"
_grpc "google.golang.org/grpc"
_conn "google.golang.org/grpc/connectivity"
)
const (
defaultConnectionTimeout = 3
)
type Stub struct {
client *_grpc.ClientConn
}
type Options struct {
// Address of grpc server, i.e. 10.10.10.10:234434
Address string
// IsTLSEnabled if true then authentication to server needed with cert.
IsTLSEnabled bool
// WaitConnectionReady wait till connection ready. Blocking.
// If true then will block for as long as the ConnectionTimeOut.
WaitConnectionReady bool
// MaxRetry if WaitConnectionReady is true then will be ignored.
// if WaitConnectionReady is false then will be used in background for retrying mechanism.
MaxRetry int
// ConnectionTimeOut(seconds) If WaitConnectionReady is true then will be used as time to wait till connected.
// if WaitConnectionReady is false then will be used as time to wait in WaitForStateChange while retrying.
// Default is 5 seconds
ConnectionTimeOut int
}
func New(o *Options) (*Stub, error) {
if o.ConnectionTimeOut <= 0 {
o.ConnectionTimeOut = defaultConnectionTimeout // default is 3 seconds
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(int64(o.ConnectionTimeOut))*time.Second)
defer cancel()
// setting dial options
dialOpts := []_grpc.DialOption{}
if o.IsTLSEnabled {
// TODO add tls support
} else {
dialOpts = append(dialOpts, _grpc.WithInsecure())
}
if o.WaitConnectionReady {
dialOpts = append(dialOpts, _grpc.WithBlock())
}
conn, err := _grpc.DialContext(ctx, o.Address, dialOpts...)
if err != nil {
if conn != nil {
defer conn.Close()
}
return nil, err
}
stub := &Stub{
client: conn,
}
retryCount := 0
if !o.WaitConnectionReady {
go func(con *_grpc.ClientConn) {
for retryCount < o.MaxRetry {
// wait as long as the timeout for connecting.
if retryCount == 0 {
time.Sleep(time.Duration(int64(o.ConnectionTimeOut)) * time.Second)
}
if retry(ctx, con) {
return
}
retryCount++
}
log.Println("grpc: exceed max retry, failed to reconnect.")
}(conn)
}
return stub, nil
}
func retry(ctx context.Context, con *_grpc.ClientConn) bool {
state := con.GetState()
if state != _conn.Ready && state != _conn.Idle {
log.Println("grpc: connection is not ready yet, waiting for state change")
if !con.WaitForStateChange(ctx, state) {
log.Println("grpc: reconnecting is failed, retrying...")
return false
} else {
log.Println("grpc: connection is ready")
return true
}
} else {
log.Println("grpc: connection is ready")
return true
}
}
func (s *Stub) ClientConn() *_grpc.ClientConn {
return s.client
}
func (s *Stub) IsClientReady() bool {
return s.client.GetState() == _conn.Ready
}
func (s *Stub) ServerAddress() string {
return s.client.Target()
}
func (s *Stub) Close() error {
return s.client.Close()
}