diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 9417aa107e80..924d761842de 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -10,7 +10,10 @@ load( go_library( name = "go_default_library", - srcs = ["proxier.go"], + srcs = [ + "proxier.go", + "syncthrottle.go", + ], tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", @@ -36,7 +39,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["proxier_test.go"], + srcs = [ + "proxier_test.go", + "synchthrottle_test.go", + ], library = ":go_default_library", tags = ["automanaged"], deps = [ diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 265387c0d096..719ab161d0aa 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -38,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/api" apiservice "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/api/v1" @@ -197,11 +196,9 @@ type Proxier struct { portsMap map[localPort]closeable haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event - throttle flowcontrol.RateLimiter + throttle *syncThrottle // These are effectively const and do not need the mutex to be held. - syncPeriod time.Duration - minSyncPeriod time.Duration iptables utiliptables.Interface masqueradeAll bool masqueradeMark string @@ -297,21 +294,11 @@ func NewProxier(ipt utiliptables.Interface, go healthcheck.Run() - var throttle flowcontrol.RateLimiter - // Defaulting back to not limit sync rate when minSyncPeriod is 0. - if minSyncPeriod != 0 { - syncsPerSecond := float32(time.Second) / float32(minSyncPeriod) - // The average use case will process 2 updates in short succession - throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2) - } - return &Proxier{ serviceMap: make(proxyServiceMap), endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo), portsMap: make(map[localPort]closeable), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - throttle: throttle, + throttle: newSyncThrottle(minSyncPeriod, syncPeriod), iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, @@ -414,15 +401,16 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { func (proxier *Proxier) Sync() { proxier.mu.Lock() defer proxier.mu.Unlock() + proxier.throttle.resetTimer() proxier.syncProxyRules() } // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { - t := time.NewTicker(proxier.syncPeriod) - defer t.Stop() + proxier.throttle.resetTimer() + defer proxier.throttle.stopTimer() for { - <-t.C + <-proxier.throttle.timer.C glog.V(6).Infof("Periodic sync") proxier.Sync() } @@ -807,12 +795,11 @@ func (proxier *Proxier) execConntrackTool(parameters ...string) error { // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { - if proxier.throttle != nil { - proxier.throttle.Accept() + if !proxier.throttle.allowSync() { + return } - start := time.Now() defer func() { - glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) + glog.V(4).Infof("syncProxyRules took %v", proxier.throttle.timeElapsedSinceLastSync()) }() // don't sync rules till we've received services and endpoints if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index e8d1c21fb5ff..7635083c8065 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/util/intstr" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" + "time" ) func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) { @@ -497,6 +498,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { return &Proxier{ exec: &exec.FakeExec{}, serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + throttle: newSyncThrottle(0, time.Second*10), iptables: ipt, endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo), clusterCIDR: "10.0.0.0/24", diff --git a/pkg/proxy/iptables/synchthrottle_test.go b/pkg/proxy/iptables/synchthrottle_test.go new file mode 100644 index 000000000000..ba8f231caf9b --- /dev/null +++ b/pkg/proxy/iptables/synchthrottle_test.go @@ -0,0 +1,52 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "testing" + "time" +) + +func TestBack2BackSync(t *testing.T) { + st := newSyncThrottle(time.Second, time.Second*10) + t1 := st.allowSync() + t2 := st.allowSync() + if !t1 || !t2 { + t.Errorf("Back 2 Back Sync failed") + } +} + +func TestRejectResetAccept(t *testing.T) { + minSync := time.Millisecond * 50 + st := newSyncThrottle(minSync, time.Second) + st.resetTimer() + t1 := st.allowSync() + t2 := st.allowSync() + t3 := st.allowSync() + if !t1 || !t2 || t3 { + t.Errorf("Failed to reject spamming") + } + <-st.timer.C + if st.timeElapsedSinceLastSync() < minSync { + t.Errorf("Failed to wait till elapsed minSync") + } + t4 := st.allowSync() + if !t4 { + t.Errorf("Failed to allow after minSync ellapsed") + } + +} diff --git a/pkg/proxy/iptables/syncthrottle.go b/pkg/proxy/iptables/syncthrottle.go new file mode 100644 index 000000000000..80b3bdd885f5 --- /dev/null +++ b/pkg/proxy/iptables/syncthrottle.go @@ -0,0 +1,89 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package iptables + +import ( + "time" + + "github.com/golang/glog" + "k8s.io/client-go/util/flowcontrol" +) + +// syncThrottle is a utility wrapper to handle time based +// iptables synchronization. +// The purpose is to rate limit updates to prevent excessive +// iptables synchronizations during endpoint watch updates. +// It is not thread-safe. +type syncThrottle struct { + rl flowcontrol.RateLimiter // rate limiter to prevent accessive iptables sync + timer *time.Timer // timer used to trigger a iptables sync + lastSync time.Time // time since last sync + minSyncPeriod time.Duration // the minimum period allowed between iptables sync e.g. 1 second + syncPeriod time.Duration // default rectification cycle +} + +// creates a new syncThrottle +func newSyncThrottle(minSyncPeriod time.Duration, syncPeriod time.Duration) *syncThrottle { + st := &syncThrottle{ + minSyncPeriod: minSyncPeriod, + syncPeriod: syncPeriod, + } + + if minSyncPeriod != 0 { + // input of minSyncPeriod is a duration typically in seconds, but could be .5 + syncsPerSecond := float32(time.Second) / float32(minSyncPeriod) + // The average use case will process 2 updates in short succession + st.rl = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2) + } + return st +} + +// allowSync returns true when we are allowed to sync +// based on the minSyncPeriod. When false, the +// timer for the syncPeriod is reset +func (s *syncThrottle) allowSync() bool { + if s.rl != nil { + if s.rl.TryAccept() == false { + duration := s.timeElapsedSinceLastSync() + glog.V(4).Infof("Attempting to synch too often. Duration: %v, min period: %v", duration, s.minSyncPeriod) + s.timer.Reset(s.minSyncPeriod - duration) + return false + } + } + s.lastSync = time.Now() + return true +} + +// timeEllapsedSinceLastSync will return the duration since the last sync +func (s *syncThrottle) timeElapsedSinceLastSync() time.Duration { + return time.Since(s.lastSync) +} + +// resetTimer sets the timer back to the default syncPeriod +func (s *syncThrottle) resetTimer() { + if s.timer == nil { + s.timer = time.NewTimer(s.syncPeriod) + } else { + s.stopTimer() + s.timer.Reset(s.syncPeriod) + } +} + +// stopTimer will stop the currently running timer +func (s *syncThrottle) stopTimer() { + s.timer.Stop() +}