forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
graceful_termination.go
209 lines (182 loc) · 6.17 KB
/
graceful_termination.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipvs
import (
"fmt"
"strings"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
)
const (
rsGracefulDeletePeriod = 15 * time.Minute
rsCheckDeleteInterval = 1 * time.Minute
)
// listItem stores real server information and the process time.
// If nothing special happened, real server will be delete after process time.
type listItem struct {
VirtualServer *utilipvs.VirtualServer
RealServer *utilipvs.RealServer
}
// String return the unique real server name(with virtual server information)
func (g *listItem) String() string {
return GetUniqueRSName(g.VirtualServer, g.RealServer)
}
// GetUniqueRSName return a string type unique rs name with vs information
func GetUniqueRSName(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) string {
return vs.String() + "/" + rs.String()
}
type graceTerminateRSList struct {
lock sync.Mutex
list map[string]*listItem
}
// add push an new element to the rsList
func (q *graceTerminateRSList) add(rs *listItem) bool {
q.lock.Lock()
defer q.lock.Unlock()
uniqueRS := rs.String()
if _, ok := q.list[uniqueRS]; ok {
return false
}
klog.V(5).Infof("Adding rs %v to graceful delete rsList", rs)
q.list[uniqueRS] = rs
return true
}
// remove remove an element from the rsList
func (q *graceTerminateRSList) remove(rs *listItem) bool {
q.lock.Lock()
defer q.lock.Unlock()
uniqueRS := rs.String()
if _, ok := q.list[uniqueRS]; ok {
delete(q.list, uniqueRS)
return true
}
return false
}
func (q *graceTerminateRSList) flushList(handler func(rsToDelete *listItem) (bool, error)) bool {
success := true
for name, rs := range q.list {
deleted, err := handler(rs)
if err != nil {
klog.Errorf("Try delete rs %q err: %v", name, err)
success = false
}
if deleted {
klog.Infof("lw: remote out of the list: %s", name)
q.remove(rs)
}
}
return success
}
// exist check whether the specified unique RS is in the rsList
func (q *graceTerminateRSList) exist(uniqueRS string) (*listItem, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if rs, ok := q.list[uniqueRS]; ok {
return rs, true
}
return nil, false
}
// GracefulTerminationManager manage rs graceful termination information and do graceful termination work
// rsList is the rs list to graceful termination, ipvs is the ipvsinterface to do ipvs delete/update work
type GracefulTerminationManager struct {
rsList graceTerminateRSList
ipvs utilipvs.Interface
}
// NewGracefulTerminationManager create a gracefulTerminationManager to manage ipvs rs graceful termination work
func NewGracefulTerminationManager(ipvs utilipvs.Interface) *GracefulTerminationManager {
l := make(map[string]*listItem)
return &GracefulTerminationManager{
rsList: graceTerminateRSList{
list: l,
},
ipvs: ipvs,
}
}
// InTerminationList to check whether specified unique rs name is in graceful termination list
func (m *GracefulTerminationManager) InTerminationList(uniqueRS string) bool {
_, exist := m.rsList.exist(uniqueRS)
return exist
}
// GracefulDeleteRS to update rs weight to 0, and add rs to graceful terminate list
func (m *GracefulTerminationManager) GracefulDeleteRS(vs *utilipvs.VirtualServer, rs *utilipvs.RealServer) error {
// Try to delete rs before add it to graceful delete list
ele := &listItem{
VirtualServer: vs,
RealServer: rs,
}
deleted, err := m.deleteRsFunc(ele)
if err != nil {
klog.Errorf("Delete rs %q err: %v", ele.String(), err)
}
if deleted {
return nil
}
rs.Weight = 0
err = m.ipvs.UpdateRealServer(vs, rs)
if err != nil {
return err
}
klog.V(5).Infof("Adding an element to graceful delete rsList: %+v", ele)
m.rsList.add(ele)
return nil
}
func (m *GracefulTerminationManager) deleteRsFunc(rsToDelete *listItem) (bool, error) {
klog.Infof("Trying to delete rs: %s", rsToDelete.String())
rss, err := m.ipvs.GetRealServers(rsToDelete.VirtualServer)
if err != nil {
return false, err
}
for _, rs := range rss {
if rsToDelete.RealServer.Equal(rs) {
// For UDP traffic, no graceful termination, we immediately delete the RS
// (existing connections will be deleted on the next packet because sysctlExpireNoDestConn=1)
// For other protocols, don't delete until all connections have expired)
if strings.ToUpper(rsToDelete.VirtualServer.Protocol) != "UDP" && rs.ActiveConn+rs.InactiveConn != 0 {
klog.Infof("Not deleting, RS %v: %v ActiveConn, %v InactiveConn", rsToDelete.String(), rs.ActiveConn, rs.InactiveConn)
return false, nil
}
klog.Infof("Deleting rs: %s", rsToDelete.String())
err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rs)
if err != nil {
return false, fmt.Errorf("Delete destination %q err: %v", rs.String(), err)
}
return true, nil
}
}
return true, fmt.Errorf("Failed to delete rs %q, can't find the real server", rsToDelete.String())
}
func (m *GracefulTerminationManager) tryDeleteRs() {
if !m.rsList.flushList(m.deleteRsFunc) {
klog.Errorf("Try flush graceful termination list err")
}
}
// MoveRSOutofGracefulDeleteList to delete an rs and remove it from the rsList immediately
func (m *GracefulTerminationManager) MoveRSOutofGracefulDeleteList(uniqueRS string) error {
rsToDelete, find := m.rsList.exist(uniqueRS)
if !find || rsToDelete == nil {
return fmt.Errorf("failed to find rs: %q", uniqueRS)
}
err := m.ipvs.DeleteRealServer(rsToDelete.VirtualServer, rsToDelete.RealServer)
if err != nil {
return err
}
m.rsList.remove(rsToDelete)
return nil
}
// Run start a goroutine to try to delete rs in the graceful delete rsList with an interval 1 minute
func (m *GracefulTerminationManager) Run() {
go wait.Until(m.tryDeleteRs, rsCheckDeleteInterval, wait.NeverStop)
}