/
keepalive.go
152 lines (136 loc) · 3.02 KB
/
keepalive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package keepalive
import (
"context"
"time"
)
type ClientConn interface {
Context() context.Context
Ping(context.Context) error
Close() error
}
type RetryFunc = func() (when time.Time, err error)
type RetryFuncFactory = func() RetryFunc
// Config KeepAlive config
type Config struct {
// Interval between two success pings
Interval time.Duration
// WaitForPong how long it will waits for pong response.
WaitForPong time.Duration
// NewRetryPolicy creates retry policy for the connection when ping fails.
NewRetryPolicy RetryFuncFactory
}
var defaultOptions = options{
config: MakeConfig(time.Second * 6),
}
type CfgOpt struct {
config Config
}
func (o *CfgOpt) apply(opts *options) {
opts.config = o.config
}
func WithConfig(config Config) *CfgOpt {
return &CfgOpt{
config: config,
}
}
type options struct {
config Config
}
// MakeConfig creates a policy that detects dropped connections within the connTimeout limit
// while attempting to make 3 pings during that period.
func MakeConfig(connTimeout time.Duration) Config {
// 3 times ping-pong
duration := connTimeout / 6
return Config{
Interval: duration,
WaitForPong: duration,
NewRetryPolicy: func() RetryFunc {
// The first failure is detected after 2*duration:
// 1 since the previous ping, plus 1 for the next ping-pong to timeout
start := time.Now()
attempt := time.Duration(1)
return func() (time.Time, error) {
attempt++
// Try to send ping and wait for pong 2 more times
if time.Since(start) <= 2*2*duration {
return start.Add(attempt * duration), nil
}
return time.Time{}, ErrKeepAliveDeadlineExceeded
}
},
}
}
// A Option sets options such as config etc.
type Option interface {
apply(*options)
}
type KeepAlive struct {
cfg options
}
func New(opts ...Option) *KeepAlive {
cfg := defaultOptions
for _, o := range opts {
o.apply(&cfg)
}
return &KeepAlive{
cfg: cfg,
}
}
func ping(c ClientConn, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(c.Context(), timeout)
defer cancel()
return c.Ping(ctx)
}
func (k *KeepAlive) doPing(c ClientConn) (bool, error) {
err := ping(c, k.cfg.config.WaitForPong)
if err == nil {
return true, nil
}
if err != context.DeadlineExceeded {
if err == context.Canceled {
return false, nil
}
c.Close()
return false, err
}
retryPolicy := k.cfg.config.NewRetryPolicy()
for {
when, err := retryPolicy()
if err != nil {
c.Close()
return false, err
}
select {
case <-c.Context().Done():
return false, nil
case <-time.After(time.Until(when)):
err := ping(c, k.cfg.config.WaitForPong)
if err == context.DeadlineExceeded {
continue
}
if err == nil {
return true, nil
}
c.Close()
return false, err
}
}
}
func (k *KeepAlive) Run(c ClientConn) error {
ticker := time.NewTicker(k.cfg.config.Interval)
defer ticker.Stop()
ok := true
for ok {
select {
case <-ticker.C:
var err error
ok, err = k.doPing(c)
if err != nil {
return err
}
case <-c.Context().Done():
return nil
}
}
return nil
}