Skip to content

Commit

Permalink
Fix for non-blocking updates during min-syc-period
Browse files Browse the repository at this point in the history
  • Loading branch information
Timothy St. Clair committed Jan 25, 2017
1 parent f42440c commit cc45ab9
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 24 deletions.
10 changes: 8 additions & 2 deletions pkg/proxy/iptables/BUILD
Expand Up @@ -10,7 +10,10 @@ load(

go_library(
name = "go_default_library",
srcs = ["proxier.go"],
srcs = [
"proxier.go",
"syncthrottle.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
Expand All @@ -36,7 +39,10 @@ go_library(

go_test(
name = "go_default_test",
srcs = ["proxier_test.go"],
srcs = [
"proxier_test.go",
"synchthrottle_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
Expand Down
31 changes: 9 additions & 22 deletions pkg/proxy/iptables/proxier.go
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/api"
apiservice "k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -197,11 +196,9 @@ type Proxier struct {
portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event
haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event
throttle flowcontrol.RateLimiter
throttle *syncThrottle

// These are effectively const and do not need the mutex to be held.
syncPeriod time.Duration
minSyncPeriod time.Duration
iptables utiliptables.Interface
masqueradeAll bool
masqueradeMark string
Expand Down Expand Up @@ -297,21 +294,11 @@ func NewProxier(ipt utiliptables.Interface,

go healthcheck.Run()

var throttle flowcontrol.RateLimiter
// Defaulting back to not limit sync rate when minSyncPeriod is 0.
if minSyncPeriod != 0 {
syncsPerSecond := float32(time.Second) / float32(minSyncPeriod)
// The average use case will process 2 updates in short succession
throttle = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2)
}

return &Proxier{
serviceMap: make(proxyServiceMap),
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
portsMap: make(map[localPort]closeable),
syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod,
throttle: throttle,
throttle: newSyncThrottle(minSyncPeriod, syncPeriod),
iptables: ipt,
masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark,
Expand Down Expand Up @@ -414,15 +401,16 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
func (proxier *Proxier) Sync() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.throttle.resetTimer()
proxier.syncProxyRules()
}

// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
t := time.NewTicker(proxier.syncPeriod)
defer t.Stop()
proxier.throttle.resetTimer()
defer proxier.throttle.stopTimer()
for {
<-t.C
<-proxier.throttle.timer.C
glog.V(6).Infof("Periodic sync")
proxier.Sync()
}
Expand Down Expand Up @@ -807,12 +795,11 @@ func (proxier *Proxier) execConntrackTool(parameters ...string) error {
// The only other iptables rules are those that are setup in iptablesInit()
// assumes proxier.mu is held
func (proxier *Proxier) syncProxyRules() {
if proxier.throttle != nil {
proxier.throttle.Accept()
if !proxier.throttle.allowSync() {
return
}
start := time.Now()
defer func() {
glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
glog.V(4).Infof("syncProxyRules took %v", proxier.throttle.timeElapsedSinceLastSync())
}()
// don't sync rules till we've received services and endpoints
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
Expand Down
2 changes: 2 additions & 0 deletions pkg/proxy/iptables/proxier_test.go
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/util/intstr"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing"
"time"
)

func checkAllLines(t *testing.T, table utiliptables.Table, save []byte, expectedLines map[utiliptables.Chain]string) {
Expand Down Expand Up @@ -497,6 +498,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
return &Proxier{
exec: &exec.FakeExec{},
serviceMap: make(map[proxy.ServicePortName]*serviceInfo),
throttle: newSyncThrottle(0, time.Second*10),
iptables: ipt,
endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo),
clusterCIDR: "10.0.0.0/24",
Expand Down
52 changes: 52 additions & 0 deletions pkg/proxy/iptables/synchthrottle_test.go
@@ -0,0 +1,52 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package iptables

import (
"testing"
"time"
)

func TestBack2BackSync(t *testing.T) {
st := newSyncThrottle(time.Second, time.Second*10)
t1 := st.allowSync()
t2 := st.allowSync()
if !t1 || !t2 {
t.Errorf("Back 2 Back Sync failed")
}
}

func TestRejectResetAccept(t *testing.T) {
minSync := time.Millisecond * 50
st := newSyncThrottle(minSync, time.Second)
st.resetTimer()
t1 := st.allowSync()
t2 := st.allowSync()
t3 := st.allowSync()
if !t1 || !t2 || t3 {
t.Errorf("Failed to reject spamming")
}
<-st.timer.C
if st.timeElapsedSinceLastSync() < minSync {
t.Errorf("Failed to wait till elapsed minSync")
}
t4 := st.allowSync()
if !t4 {
t.Errorf("Failed to allow after minSync ellapsed")
}

}
89 changes: 89 additions & 0 deletions pkg/proxy/iptables/syncthrottle.go
@@ -0,0 +1,89 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package iptables

import (
"time"

"github.com/golang/glog"
"k8s.io/client-go/util/flowcontrol"
)

// syncThrottle is a utility wrapper to handle time based
// iptables synchronization.
// The purpose is to rate limit updates to prevent excessive
// iptables synchronizations during endpoint watch updates.
// It is not thread-safe.
type syncThrottle struct {
rl flowcontrol.RateLimiter // rate limiter to prevent accessive iptables sync
timer *time.Timer // timer used to trigger a iptables sync
lastSync time.Time // time since last sync
minSyncPeriod time.Duration // the minimum period allowed between iptables sync e.g. 1 second
syncPeriod time.Duration // default rectification cycle
}

// creates a new syncThrottle
func newSyncThrottle(minSyncPeriod time.Duration, syncPeriod time.Duration) *syncThrottle {
st := &syncThrottle{
minSyncPeriod: minSyncPeriod,
syncPeriod: syncPeriod,
}

if minSyncPeriod != 0 {
// input of minSyncPeriod is a duration typically in seconds, but could be .5
syncsPerSecond := float32(time.Second) / float32(minSyncPeriod)
// The average use case will process 2 updates in short succession
st.rl = flowcontrol.NewTokenBucketRateLimiter(syncsPerSecond, 2)
}
return st
}

// allowSync returns true when we are allowed to sync
// based on the minSyncPeriod. When false, the
// timer for the syncPeriod is reset
func (s *syncThrottle) allowSync() bool {
if s.rl != nil {
if s.rl.TryAccept() == false {
duration := s.timeElapsedSinceLastSync()
glog.V(4).Infof("Attempting to synch too often. Duration: %v, min period: %v", duration, s.minSyncPeriod)
s.timer.Reset(s.minSyncPeriod - duration)
return false
}
}
s.lastSync = time.Now()
return true
}

// timeEllapsedSinceLastSync will return the duration since the last sync
func (s *syncThrottle) timeElapsedSinceLastSync() time.Duration {
return time.Since(s.lastSync)
}

// resetTimer sets the timer back to the default syncPeriod
func (s *syncThrottle) resetTimer() {
if s.timer == nil {
s.timer = time.NewTimer(s.syncPeriod)
} else {
s.stopTimer()
s.timer.Reset(s.syncPeriod)
}
}

// stopTimer will stop the currently running timer
func (s *syncThrottle) stopTimer() {
s.timer.Stop()
}

0 comments on commit cc45ab9

Please sign in to comment.