Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/iptables: grab iptables locks if iptables-restore doesn't support --wait #44895

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions cluster/get-kube-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ function create_cluster {
--volume=/var/lib/docker/:/var/lib/docker:rw \
--volume=/var/lib/kubelet/:/var/lib/kubelet:rw \
--volume=/var/run:/var/run:rw \
--volume=/run/xtables.lock:/run/xtables.lock:rw \
--net=host \
--pid=host \
--privileged=true \
Expand Down
1 change: 1 addition & 0 deletions hack/local-up-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ function start_kubelet {
--volume=/var/lib/docker/:/var/lib/docker:ro \
--volume=/var/lib/kubelet/:/var/lib/kubelet:rw \
--volume=/dev:/dev \
--volume=/run/xtables.lock:/run/xtables.lock:rw \
${cred_bind} \
--net=host \
--privileged=true \
Expand Down
1 change: 1 addition & 0 deletions pkg/util/iptables/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"//vendor/github.com/godbus/dbus:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
],
)

Expand Down
9 changes: 9 additions & 0 deletions pkg/util/iptables/OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
reviewers:
- dcbw
- thockin
- eparis
approvers:
- dcbw
- thockin
- eparis

97 changes: 95 additions & 2 deletions pkg/util/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ package iptables
import (
"bytes"
"fmt"
"net"
"os"
"regexp"
"strings"
"sync"
"syscall"
"time"

godbus "github.com/godbus/dbus"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utilversion "k8s.io/kubernetes/pkg/util/version"
Expand Down Expand Up @@ -122,6 +127,8 @@ const MinCheckVersion = "1.4.11"
const MinWaitVersion = "1.4.20"
const MinWait2Version = "1.4.22"

const LockfilePath16x = "/run/xtables.lock"

// runner implements Interface in terms of exec("iptables").
type runner struct {
mu sync.Mutex
Expand All @@ -131,31 +138,43 @@ type runner struct {
hasCheck bool
waitFlag []string
restoreWaitFlag []string
lockfilePath string

reloadFuncs []func()
signal chan *godbus.Signal
}

// New returns a new Interface which will exec iptables.
func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface {
// newInternal returns a new Interface which will exec iptables, and allows the
// caller to change the iptables-restore lockfile path
func newInternal(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol, lockfilePath string) Interface {
vstring, err := getIPTablesVersionString(exec)
if err != nil {
glog.Warningf("Error checking iptables version, assuming version at least %s: %v", MinCheckVersion, err)
vstring = MinCheckVersion
}

if lockfilePath == "" {
lockfilePath = LockfilePath16x
}

runner := &runner{
exec: exec,
dbus: dbus,
protocol: protocol,
hasCheck: getIPTablesHasCheckCommand(vstring),
waitFlag: getIPTablesWaitFlag(vstring),
restoreWaitFlag: getIPTablesRestoreWaitFlag(exec),
lockfilePath: lockfilePath,
}
runner.connectToFirewallD()
return runner
}

// New returns a new Interface which will exec iptables.
func New(exec utilexec.Interface, dbus utildbus.Interface, protocol Protocol) Interface {
return newInternal(exec, dbus, protocol, "")
}

// Destroy is part of Interface.
func (runner *runner) Destroy() {
if runner.signal != nil {
Expand Down Expand Up @@ -325,6 +344,71 @@ func (runner *runner) RestoreAll(data []byte, flush FlushFlag, counters RestoreC
return runner.restoreInternal(args, data, flush, counters)
}

type locker struct {
lock16 *os.File
lock14 *net.UnixListener
}

func (l *locker) Close() {
if l.lock16 != nil {
l.lock16.Close()
}
if l.lock14 != nil {
l.lock14.Close()
}
}

func (runner *runner) grabIptablesLocks() (*locker, error) {
var err error
var success bool

l := &locker{}
defer func(l *locker) {
// Clean up immediately on failure
if !success {
l.Close()
}
}(l)

if len(runner.restoreWaitFlag) > 0 {
// iptables-restore supports --wait; no need to grab locks
return l, nil
}

// Grab both 1.6.x and 1.4.x-style locks; we don't know what the
// iptables-restore version is if it doesn't support --wait, so we
// can't assume which lock method it'll use.

// Roughly duplicate iptables 1.6.x xtables_lock() function.
l.lock16, err = os.OpenFile(runner.lockfilePath, os.O_CREATE, 0600)
if err != nil {
return nil, fmt.Errorf("failed to open iptables lock %s: %v", runner.lockfilePath, err)
}

if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := syscall.Flock(int(l.lock16.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
return false, nil
}
return true, nil
}); err != nil {
return nil, fmt.Errorf("failed to acquire new iptables lock: %v", err)
}

// Roughly duplicate iptables 1.4.x xtables_lock() function.
if err := wait.PollImmediate(200*time.Millisecond, 2*time.Second, func() (bool, error) {
l.lock14, err = net.ListenUnix("unix", &net.UnixAddr{Name: "@xtables", Net: "unix"})
if err != nil {
return false, nil
}
return true, nil
}); err != nil {
return nil, fmt.Errorf("failed to acquire old iptables lock: %v", err)
}

success = true
return l, nil
}

// restoreInternal is the shared part of Restore/RestoreAll
func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFlag, counters RestoreCountersFlag) error {
runner.mu.Lock()
Expand All @@ -337,6 +421,15 @@ func (runner *runner) restoreInternal(args []string, data []byte, flush FlushFla
args = append(args, "--counters")
}

// Grab the iptables lock to prevent iptables-restore and iptables
// from stepping on each other. iptables-restore 1.6.2 will have
// a --wait option like iptables itself, but that's not widely deployed.
locker, err := runner.grabIptablesLocks()
if err != nil {
return err
}
defer locker.Close()

// run the command and return the output or an error including the output and error
fullArgs := append(runner.restoreWaitFlag, args...)
glog.V(4).Infof("running iptables-restore %v", fullArgs)
Expand Down
97 changes: 91 additions & 6 deletions pkg/util/iptables/iptables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package iptables

import (
"net"
"os"
"strings"
"syscall"
"testing"
"time"

Expand All @@ -26,6 +29,8 @@ import (
"k8s.io/kubernetes/pkg/util/exec"
)

const TestLockfilePath = "xtables.lock"

func getIPTablesCommand(protocol Protocol) string {
if protocol == ProtocolIpv4 {
return cmdIPTables
Expand Down Expand Up @@ -1036,12 +1041,12 @@ func TestRestoreAll(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
defer runner.Destroy()

err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err != nil {
t.Errorf("expected success, got %v", err)
t.Fatalf("expected success, got %v", err)
}

commandSet := sets.NewString(fcmd.CombinedOutputLog[2]...)
Expand Down Expand Up @@ -1080,12 +1085,12 @@ func TestRestoreAllWait(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
defer runner.Destroy()

err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err != nil {
t.Errorf("expected success, got %v", err)
t.Fatalf("expected success, got %v", err)
}

commandSet := sets.NewString(fcmd.CombinedOutputLog[2]...)
Expand Down Expand Up @@ -1125,12 +1130,12 @@ func TestRestoreAllWaitOldIptablesRestore(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
runner := New(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4)
runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
defer runner.Destroy()

err := runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err != nil {
t.Errorf("expected success, got %v", err)
t.Fatalf("expected success, got %v", err)
}

commandSet := sets.NewString(fcmd.CombinedOutputLog[2]...)
Expand All @@ -1151,3 +1156,83 @@ func TestRestoreAllWaitOldIptablesRestore(t *testing.T) {
t.Errorf("expected failure")
}
}

// TestRestoreAllGrabNewLock tests that the iptables code will grab the
// iptables /run lock when using an iptables-restore version that does not
// support the --wait argument
func TestRestoreAllGrabNewLock(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
// iptables version check
func() ([]byte, error) { return []byte("iptables v1.9.22"), nil },
// iptables-restore version check
func() ([]byte, error) { return []byte("unrecognized option: --version"), nil },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}

runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
defer runner.Destroy()

// Grab the /run lock and ensure the RestoreAll fails
runLock, err := os.OpenFile(TestLockfilePath, os.O_CREATE, 0600)
if err != nil {
t.Fatalf("expected to open %s, got %v", TestLockfilePath, err)
}
defer runLock.Close()

if err := syscall.Flock(int(runLock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB); err != nil {
t.Errorf("expected to lock %s, got %v", TestLockfilePath, err)
}

err = runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err == nil {
t.Errorf("expected failure, got success instead")
}
if !strings.Contains(err.Error(), "failed to acquire new iptables lock: timed out waiting for the condition") {
t.Errorf("expected timeout error, got %v", err)
}
}

// TestRestoreAllGrabOldLock tests that the iptables code will grab the
// iptables @xtables abstract unix socket lock when using an iptables-restore
// version that does not support the --wait argument
func TestRestoreAllGrabOldLock(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
// iptables version check
func() ([]byte, error) { return []byte("iptables v1.9.22"), nil },
// iptables-restore version check
func() ([]byte, error) { return []byte("unrecognized option: --version"), nil },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}

runner := newInternal(&fexec, dbus.NewFake(nil, nil), ProtocolIpv4, TestLockfilePath)
defer runner.Destroy()

// Grab the abstract @xtables socket
runLock, err := net.ListenUnix("unix", &net.UnixAddr{Name: "@xtables", Net: "unix"})
if err != nil {
t.Fatalf("expected to lock @xtables, got %v", err)
}
defer runLock.Close()

err = runner.RestoreAll([]byte{}, NoFlushTables, RestoreCounters)
if err == nil {
t.Errorf("expected failure, got success instead")
}
if !strings.Contains(err.Error(), "failed to acquire old iptables lock: timed out waiting for the condition") {
t.Errorf("expected timeout error, got %v", err)
}
}