Skip to content

Commit

Permalink
ipvs: remove port opener
Browse files Browse the repository at this point in the history
  • Loading branch information
khenidak authored and aramase committed Mar 22, 2022
1 parent 39234d6 commit db608d3
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 85 deletions.
74 changes: 3 additions & 71 deletions pkg/proxy/ipvs/proxier.go
Expand Up @@ -222,7 +222,6 @@ type Proxier struct {
mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap
portsMap map[utilnet.LocalPort]utilnet.Closeable
nodeLabels map[string]string
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when
// corresponding objects are synced after startup. This is used to avoid updating
Expand All @@ -249,7 +248,6 @@ type Proxier struct {
localDetector proxyutiliptables.LocalTrafficDetector
hostname string
nodeIP net.IP
portMapper utilnet.PortOpener
recorder record.EventRecorder

serviceHealthServer healthcheck.ServiceHealthServer
Expand Down Expand Up @@ -465,7 +463,6 @@ func NewProxier(ipt utiliptables.Interface,

proxier := &Proxier{
ipFamily: ipFamily,
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap),
Expand All @@ -480,7 +477,6 @@ func NewProxier(ipt utiliptables.Interface,
localDetector: localDetector,
hostname: hostname,
nodeIP: nodeIP,
portMapper: &utilnet.ListenPortOpener,
recorder: recorder,
serviceHealthServer: serviceHealthServer,
healthzServer: healthzServer,
Expand Down Expand Up @@ -1098,8 +1094,6 @@ func (proxier *Proxier) syncProxyRules() {
set.resetEntries()
}

// Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[utilnet.LocalPort]utilnet.Closeable{}
// activeIPVSServices represents IPVS service successfully created in this round of sync
activeIPVSServices := map[string]bool{}
// currentIPVSServices represent IPVS services listed from the system
Expand Down Expand Up @@ -1160,8 +1154,6 @@ func (proxier *Proxier) syncProxyRules() {
// reset slice to filtered entries
nodeIPs = nodeIPs[:idx]

localAddrSet := utilproxy.GetLocalAddrSet()

// Build IPVS rules for each service.
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
Expand Down Expand Up @@ -1251,41 +1243,6 @@ func (proxier *Proxier) syncProxyRules() {

// Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPStrings() {
// If the "external" IP happens to be an IP that is local to this
// machine, hold the local port open so no other process can open it
// (because the socket might open but it would never work).
if (svcInfo.Protocol() != v1.ProtocolSCTP) && localAddrSet.Has(net.ParseIP(externalIP)) {
// We do not start listening on SCTP ports, according to our agreement in the SCTP support KEP
lp := utilnet.LocalPort{
Description: "externalIP for " + svcNameString,
IP: externalIP,
IPFamily: localPortIPFamily,
Port: svcInfo.Port(),
Protocol: utilnet.Protocol(svcInfo.Protocol()),
}
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
msg := fmt.Sprintf("can't open %s, skipping this externalIP: %v", lp.String(), err)

proxier.recorder.Eventf(
&v1.ObjectReference{
Kind: "Node",
Name: proxier.hostname,
UID: types.UID(proxier.hostname),
Namespace: "",
}, v1.EventTypeWarning, err.Error(), msg)
klog.Error(msg)
continue
}
klog.V(2).Infof("Opened local port %s", lp.String())
replacementPortsMap[lp] = socket
}
} // We're holding the port, so it's OK to install IPVS rules.

// ipset call
entry := &utilipset.Entry{
IP: externalIP,
Expand Down Expand Up @@ -1464,24 +1421,9 @@ func (proxier *Proxier) syncProxyRules() {

// For ports on node IPs, open the actual port and hold it.
for _, lp := range lps {
if proxier.portsMap[lp] != nil {
klog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
// We do not start listening on SCTP ports, according to our agreement in the
// SCTP support KEP
} else if svcInfo.Protocol() != v1.ProtocolSCTP {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
klog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
klog.V(2).Infof("Opened local port %s", lp.String())

if lp.Protocol == utilnet.UDP {
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
}
replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules.
if svcInfo.Protocol() != v1.ProtocolSCTP && lp.Protocol == utilnet.UDP {
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, v1.ProtocolUDP)
}
}

// Nodeports need SNAT, unless they're local.
Expand Down Expand Up @@ -1634,8 +1576,6 @@ func (proxier *Proxier) syncProxyRules() {
if err != nil {
klog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesData.Bytes())
metrics.IptablesRestoreFailuresTotal.Inc()
// Revert new local ports.
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
return
}
for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes {
Expand All @@ -1646,14 +1586,6 @@ func (proxier *Proxier) syncProxyRules() {
}
}

// Close old local ports and save new ones.
for k, v := range proxier.portsMap {
if replacementPortsMap[k] == nil {
v.Close()
}
}
proxier.portsMap = replacementPortsMap

// Get legacy bind address
// currentBindAddrs represents ip addresses bind to DefaultDummyDevice from the system
currentBindAddrs, err := proxier.netlinkHandle.ListBindAddress(DefaultDummyDevice)
Expand Down
14 changes: 0 additions & 14 deletions pkg/proxy/ipvs/proxier_test.go
Expand Up @@ -70,18 +70,6 @@ func (f *fakeIPGetter) BindedIPs() (sets.String, error) {
return f.bindedIPs, nil
}

// fakePortOpener implements portOpener.
type fakePortOpener struct {
openPorts []*utilnet.LocalPort
}

// OpenLocalPort fakes out the listen() and bind() used by syncProxyRules
// to lock a local port.
func (f *fakePortOpener) OpenLocalPort(lp *utilnet.LocalPort) (utilnet.Closeable, error) {
f.openPorts = append(f.openPorts, lp)
return nil, nil
}

// fakeKernelHandler implements KernelHandler.
type fakeKernelHandler struct {
modules []string
Expand Down Expand Up @@ -153,8 +141,6 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
strictARP: false,
localDetector: proxyutiliptables.NewNoOpLocalDetector(),
hostname: testHostname,
portsMap: make(map[utilnet.LocalPort]utilnet.Closeable),
portMapper: &fakePortOpener{[]*utilnet.LocalPort{}},
serviceHealthServer: healthcheck.NewFakeServiceHealthServer(),
ipvsScheduler: DefaultScheduler,
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
Expand Down

0 comments on commit db608d3

Please sign in to comment.