From 9679eaaa1189fc26691b1b5d83fd79e48841c330 Mon Sep 17 00:00:00 2001 From: Martynas Pumputis Date: Sun, 15 Oct 2017 19:43:02 +0200 Subject: [PATCH] WIP: netpol whitelist --- DEPENDENCIES | 2 +- npc/analyser.go | 135 ++++++-- npc/controller.go | 55 ++- npc/controller_test.go | 143 +++++++- npc/ipset/ipset.go | 29 ++ npc/namespace.go | 332 +++++++++++++++---- npc/selector.go | 68 +++- prog/weave-kube/weave-daemonset-k8s-1.7.yaml | 8 + prog/weave-npc/main.go | 54 +-- test/840_weave_kube_3_test.sh | 40 ++- test/Vagrantfile | 2 +- test/run-integration-tests.sh | 3 +- test/run_all.sh | 2 +- 13 files changed, 723 insertions(+), 150 deletions(-) diff --git a/DEPENDENCIES b/DEPENDENCIES index 62f24f95b2..19d08d1c2b 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -1,3 +1,3 @@ DOCKER_VERSION=17.06 -KUBERNETES_VERSION=1.6.2 +KUBERNETES_VERSION=1.7.8 KUBERNETES_CNI_VERSION=0.5.1 diff --git a/npc/analyser.go b/npc/analyser.go index cc8625f150..2a51356ab3 100644 --- a/npc/analyser.go +++ b/npc/analyser.go @@ -5,12 +5,13 @@ import ( apiv1 "k8s.io/api/core/v1" extnapi "k8s.io/api/extensions/v1beta1" + networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/util/intstr" "github.com/weaveworks/weave/npc/ipset" ) -func (ns *ns) analysePolicy(policy *extnapi.NetworkPolicy) ( +func (ns *ns) analysePolicyLegacy(policy *extnapi.NetworkPolicy) ( rules map[string]*ruleSpec, nsSelectors, podSelectors map[string]*selectorSpec, err error) { @@ -19,7 +20,7 @@ func (ns *ns) analysePolicy(policy *extnapi.NetworkPolicy) ( podSelectors = make(map[string]*selectorSpec) rules = make(map[string]*ruleSpec) - dstSelector, err := newSelectorSpec(&policy.Spec.PodSelector, ns.name, ipset.HashIP) + dstSelector, err := newSelectorSpec(&policy.Spec.PodSelector, true, ns.name, ipset.HashIP) if err != nil { return nil, nil, nil, err } @@ -45,7 +46,7 @@ func (ns *ns) analysePolicy(policy *extnapi.NetworkPolicy) ( } else { // Ports is present and contains at least one item, then this rule allows traffic // only if the traffic matches at least one port in the ports list. - withNormalisedProtoAndPort(ingressRule.Ports, func(proto, port string) { + withNormalisedProtoAndPortLegacy(ingressRule.Ports, func(proto, port string) { rule := newRuleSpec(&proto, nil, dstSelector, &port) rules[rule.key] = rule }) @@ -56,14 +57,14 @@ func (ns *ns) analysePolicy(policy *extnapi.NetworkPolicy) ( for _, peer := range ingressRule.From { var srcSelector *selectorSpec if peer.PodSelector != nil { - srcSelector, err = newSelectorSpec(peer.PodSelector, ns.name, ipset.HashIP) + srcSelector, err = newSelectorSpec(peer.PodSelector, false, ns.name, ipset.HashIP) if err != nil { return nil, nil, nil, err } podSelectors[srcSelector.key] = srcSelector } if peer.NamespaceSelector != nil { - srcSelector, err = newSelectorSpec(peer.NamespaceSelector, "", ipset.ListSet) + srcSelector, err = newSelectorSpec(peer.NamespaceSelector, false, "", ipset.ListSet) if err != nil { return nil, nil, nil, err } @@ -77,7 +78,7 @@ func (ns *ns) analysePolicy(policy *extnapi.NetworkPolicy) ( } else { // Ports is present and contains at least one item, then this rule allows traffic // only if the traffic matches at least one port in the ports list. - withNormalisedProtoAndPort(ingressRule.Ports, func(proto, port string) { + withNormalisedProtoAndPortLegacy(ingressRule.Ports, func(proto, port string) { rule := newRuleSpec(&proto, srcSelector, dstSelector, &port) rules[rule.key] = rule }) @@ -89,26 +90,114 @@ func (ns *ns) analysePolicy(policy *extnapi.NetworkPolicy) ( return rules, nsSelectors, podSelectors, nil } -func withNormalisedProtoAndPort(npps []extnapi.NetworkPolicyPort, f func(proto, port string)) { - for _, npp := range npps { - // If no proto is specified, default to TCP - proto := string(apiv1.ProtocolTCP) - if npp.Protocol != nil { - proto = string(*npp.Protocol) - } +func (ns *ns) analysePolicy(policy *networkingv1.NetworkPolicy) ( + rules map[string]*ruleSpec, + nsSelectors, podSelectors map[string]*selectorSpec, + err error) { + + nsSelectors = make(map[string]*selectorSpec) + podSelectors = make(map[string]*selectorSpec) + rules = make(map[string]*ruleSpec) + + // If empty, matches all pods in a namespace + dstSelector, err := newSelectorSpec(&policy.Spec.PodSelector, true, ns.name, ipset.HashIP) + if err != nil { + return nil, nil, nil, err + } + podSelectors[dstSelector.key] = dstSelector + + // If ingress is empty then this NetworkPolicy does not allow any traffic + if policy.Spec.Ingress == nil || len(policy.Spec.Ingress) == 0 { + return + } + + for _, ingressRule := range policy.Spec.Ingress { + // If Ports is empty or missing, this rule matches all ports + allPorts := ingressRule.Ports == nil || len(ingressRule.Ports) == 0 + // If From is empty or missing, this rule matches all sources + allSources := ingressRule.From == nil || len(ingressRule.From) == 0 + + if allSources { + if allPorts { + rule := newRuleSpec(nil, nil, dstSelector, nil) + rules[rule.key] = rule + } else { + withNormalisedProtoAndPort(ingressRule.Ports, func(proto, port string) { + rule := newRuleSpec(&proto, nil, dstSelector, &port) + rules[rule.key] = rule + }) + } + } else { + for _, peer := range ingressRule.From { + var srcSelector *selectorSpec + + // NetworkPolicyPeer describes a peer to allow traffic from. + // Exactly one of its fields must be specified. + if peer.PodSelector != nil { + srcSelector, err = newSelectorSpec(peer.PodSelector, false, ns.name, ipset.HashIP) + if err != nil { + return nil, nil, nil, err + } + podSelectors[srcSelector.key] = srcSelector + + } else if peer.NamespaceSelector != nil { + srcSelector, err = newSelectorSpec(peer.NamespaceSelector, false, "", ipset.ListSet) + if err != nil { + return nil, nil, nil, err + } + nsSelectors[srcSelector.key] = srcSelector + } - // If no port is specified, match any port. Let iptables executable handle - // service name resolution - port := "0:65535" - if npp.Port != nil { - switch npp.Port.Type { - case intstr.Int: - port = fmt.Sprintf("%d", npp.Port.IntVal) - case intstr.String: - port = npp.Port.StrVal + if allPorts { + rule := newRuleSpec(nil, srcSelector, dstSelector, nil) + rules[rule.key] = rule + } else { + withNormalisedProtoAndPort(ingressRule.Ports, func(proto, port string) { + rule := newRuleSpec(&proto, srcSelector, dstSelector, &port) + rules[rule.key] = rule + }) + } } } + } - f(proto, port) + return rules, nsSelectors, podSelectors, nil +} + +func withNormalisedProtoAndPortLegacy(npps []extnapi.NetworkPolicyPort, f func(proto, port string)) { + for _, npp := range npps { + f(proto(npp.Protocol), port(npp.Port)) } } + +func withNormalisedProtoAndPort(npps []networkingv1.NetworkPolicyPort, f func(proto, port string)) { + for _, npp := range npps { + f(proto(npp.Protocol), port(npp.Port)) + } +} + +func proto(p *apiv1.Protocol) string { + // If no proto is specified, default to TCP + proto := string(apiv1.ProtocolTCP) + if p != nil { + proto = string(*p) + } + + return proto +} + +func port(p *intstr.IntOrString) string { + // If no port is specified, match any port. Let iptables executable handle + // service name resolution + port := "0:65535" + if p != nil { + switch p.Type { + case intstr.Int: + port = fmt.Sprintf("%d", p.IntVal) + case intstr.String: + port = p.StrVal + } + } + + return port +} diff --git a/npc/controller.go b/npc/controller.go index 6a586f5177..fc363dfde4 100644 --- a/npc/controller.go +++ b/npc/controller.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" coreapi "k8s.io/api/core/v1" extnapi "k8s.io/api/extensions/v1beta1" + networkingv1 "k8s.io/api/networking/v1" "github.com/weaveworks/weave/common" "github.com/weaveworks/weave/npc/ipset" @@ -21,9 +22,9 @@ type NetworkPolicyController interface { UpdatePod(oldObj, newObj *coreapi.Pod) error DeletePod(obj *coreapi.Pod) error - AddNetworkPolicy(obj *extnapi.NetworkPolicy) error - UpdateNetworkPolicy(oldObj, newObj *extnapi.NetworkPolicy) error - DeleteNetworkPolicy(obj *extnapi.NetworkPolicy) error + AddNetworkPolicy(obj interface{}) error + UpdateNetworkPolicy(oldObj, newObj interface{}) error + DeleteNetworkPolicy(obj interface{}) error } type controller struct { @@ -36,16 +37,20 @@ type controller struct { nss map[string]*ns // ns name -> ns struct nsSelectors *selectorSet // selector string -> nsSelector + + legacy bool // use legacy network policy semantics (k8s pre-1.7) } -func New(nodeName string, ipt iptables.Interface, ips ipset.Interface) NetworkPolicyController { +func New(nodeName string, legacy bool, ipt iptables.Interface, ips ipset.Interface) NetworkPolicyController { c := &controller{ nodeName: nodeName, + legacy: legacy, ipt: ipt, ips: ips, nss: make(map[string]*ns)} - c.nsSelectors = newSelectorSet(ips, c.onNewNsSelector) + doNothing := func(*selector) error { return nil } + c.nsSelectors = newSelectorSet(ips, c.onNewNsSelector, doNothing, doNothing) return c } @@ -66,7 +71,7 @@ func (npc *controller) onNewNsSelector(selector *selector) error { func (npc *controller) withNS(name string, f func(ns *ns) error) error { ns, found := npc.nss[name] if !found { - newNs, err := newNS(name, npc.nodeName, npc.ipt, npc.ips, npc.nsSelectors) + newNs, err := newNS(name, npc.nodeName, npc.legacy, npc.ipt, npc.ips, npc.nsSelectors) if err != nil { return err } @@ -115,32 +120,47 @@ func (npc *controller) DeletePod(obj *coreapi.Pod) error { }) } -func (npc *controller) AddNetworkPolicy(obj *extnapi.NetworkPolicy) error { +func (npc *controller) AddNetworkPolicy(obj interface{}) error { npc.Lock() defer npc.Unlock() + nsName, err := nsName(obj) + if err != nil { + return err + } + common.Log.Infof("EVENT AddNetworkPolicy %s", js(obj)) - return npc.withNS(obj.ObjectMeta.Namespace, func(ns *ns) error { + return npc.withNS(nsName, func(ns *ns) error { return errors.Wrap(ns.addNetworkPolicy(obj), "add network policy") }) } -func (npc *controller) UpdateNetworkPolicy(oldObj, newObj *extnapi.NetworkPolicy) error { +func (npc *controller) UpdateNetworkPolicy(oldObj, newObj interface{}) error { npc.Lock() defer npc.Unlock() + nsName, err := nsName(oldObj) + if err != nil { + return err + } + common.Log.Infof("EVENT UpdateNetworkPolicy %s %s", js(oldObj), js(newObj)) - return npc.withNS(oldObj.ObjectMeta.Namespace, func(ns *ns) error { + return npc.withNS(nsName, func(ns *ns) error { return errors.Wrap(ns.updateNetworkPolicy(oldObj, newObj), "update network policy") }) } -func (npc *controller) DeleteNetworkPolicy(obj *extnapi.NetworkPolicy) error { +func (npc *controller) DeleteNetworkPolicy(obj interface{}) error { npc.Lock() defer npc.Unlock() + nsName, err := nsName(obj) + if err != nil { + return err + } + common.Log.Infof("EVENT DeleteNetworkPolicy %s", js(obj)) - return npc.withNS(obj.ObjectMeta.Namespace, func(ns *ns) error { + return npc.withNS(nsName, func(ns *ns) error { return errors.Wrap(ns.deleteNetworkPolicy(obj), "delete network policy") }) } @@ -174,3 +194,14 @@ func (npc *controller) DeleteNamespace(obj *coreapi.Namespace) error { return errors.Wrap(ns.deleteNamespace(obj), "delete namespace") }) } + +func nsName(obj interface{}) (string, error) { + switch obj := obj.(type) { + case *networkingv1.NetworkPolicy: + return obj.ObjectMeta.Namespace, nil + case *extnapi.NetworkPolicy: + return obj.ObjectMeta.Namespace, nil + } + + return "", errInvalidNetworkPolicyObjType +} diff --git a/npc/controller_test.go b/npc/controller_test.go index b8dacdd28e..314fb3e7b6 100644 --- a/npc/controller_test.go +++ b/npc/controller_test.go @@ -9,6 +9,7 @@ import ( "github.com/weaveworks/weave/npc/ipset" coreapi "k8s.io/api/core/v1" extnapi "k8s.io/api/extensions/v1beta1" + networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" ) @@ -40,12 +41,22 @@ func (i *mockIPSet) Create(ipsetName ipset.Name, ipsetType ipset.Type) error { } func (i *mockIPSet) AddEntry(ipsetName ipset.Name, entry string, comment string) error { + return i.addEntry(ipsetName, entry, comment, true) +} + +func (i *mockIPSet) AddEntryIfNotExist(ipsetName ipset.Name, entry string, comment string) error { + return i.addEntry(ipsetName, entry, comment, false) +} + +func (i *mockIPSet) addEntry(ipsetName ipset.Name, entry string, comment string, checkIfExists bool) error { log.Printf("adding entry %s to %s", entry, ipsetName) - if _, ok := i.sets[entry]; !ok { + if _, ok := i.sets[string(ipsetName)]; !ok { return errors.Errorf("ipset %s does not exist", entry) } - if _, ok := i.sets[string(ipsetName)].subSets[entry]; ok { - return errors.Errorf("ipset %s is already a member of %s", entry, ipsetName) + if checkIfExists { + if _, ok := i.sets[string(ipsetName)].subSets[entry]; ok { + return errors.Errorf("ipset %s is already a member of %s", entry, ipsetName) + } } i.sets[string(ipsetName)].subSets[entry] = true @@ -53,18 +64,33 @@ func (i *mockIPSet) AddEntry(ipsetName ipset.Name, entry string, comment string) } func (i *mockIPSet) DelEntry(ipsetName ipset.Name, entry string) error { + return i.delEntry(ipsetName, entry, true) +} + +func (i *mockIPSet) DelEntryIfExists(ipsetName ipset.Name, entry string) error { + return i.delEntry(ipsetName, entry, false) +} + +func (i *mockIPSet) delEntry(ipsetName ipset.Name, entry string, checkIfExists bool) error { log.Printf("deleting entry %s from %s", entry, ipsetName) if _, ok := i.sets[string(ipsetName)]; !ok { return errors.Errorf("ipset %s does not exist", ipsetName) } - if _, ok := i.sets[string(ipsetName)].subSets[entry]; !ok { - return errors.Errorf("ipset %s is not a member of %s", entry, ipsetName) + if checkIfExists { + if _, ok := i.sets[string(ipsetName)].subSets[entry]; !ok { + return errors.Errorf("ipset %s is not a member of %s", entry, ipsetName) + } } delete(i.sets[string(ipsetName)].subSets, entry) return nil } +func (i *mockIPSet) Exist(ipsetName ipset.Name, entry string) bool { + _, found := i.sets[string(ipsetName)].subSets[entry] + return found +} + func (i *mockIPSet) Flush(ipsetName ipset.Name) error { return errors.New("Not Implemented") } @@ -154,7 +180,7 @@ func TestRegressionPolicyNamespaceOrdering3059(t *testing.T) { // Namespaces first m := newMockIPSet() - controller := New("foo", &mockIPTables{}, &m) + controller := New("foo", true, &mockIPTables{}, &m) const ( selectorIPSetName = "weave-I239Zp%sCvoVt*D6u=A!2]YEk" @@ -170,7 +196,7 @@ func TestRegressionPolicyNamespaceOrdering3059(t *testing.T) { // NetworkPolicy first m = newMockIPSet() - controller = New("foo", &mockIPTables{}, &m) + controller = New("foo", true, &mockIPTables{}, &m) controller.AddNetworkPolicy(networkPolicy) @@ -180,3 +206,106 @@ func TestRegressionPolicyNamespaceOrdering3059(t *testing.T) { require.Equal(t, true, m.sets[selectorIPSetName].subSets[sourceIPSetName]) } + +// Tests default-allow ipset behavior when running in non-legacy mode. +func TestDefaultAllow(t *testing.T) { + const ( + defaultAllowIPSetName = "weave-E.1.0W^NGSp]0_t5WwH/]gX@L" + fooPodIP = "10.32.0.10" + barPodIP = "10.32.0.11" + barPodNewIP = "10.32.0.12" + ) + + m := newMockIPSet() + controller := New("bar", false, &mockIPTables{}, &m) + + defaultNamespace := &coreapi.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + controller.AddNamespace(defaultNamespace) + + // Should create an ipset for default-allow + require.Contains(t, m.sets, defaultAllowIPSetName) + + podFoo := &coreapi.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "foo", + Namespace: "default", + Name: "foo", + Labels: map[string]string{"run": "foo"}}, + Status: coreapi.PodStatus{PodIP: fooPodIP}} + controller.AddPod(podFoo) + + // Should add the foo pod to default-allow + require.True(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + + podBar := &coreapi.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "bar", + Namespace: "default", + Name: "bar", + Labels: map[string]string{"run": "bar"}}, + Status: coreapi.PodStatus{PodIP: barPodIP}} + podBarNoIP := &coreapi.Pod{ObjectMeta: podBar.ObjectMeta} + controller.AddPod(podBarNoIP) + controller.UpdatePod(podBarNoIP, podBar) + + // Should add the bar pod to default-allow + require.True(t, m.Exist(defaultAllowIPSetName, barPodIP)) + + // Allow access from the bar pod to the foo pod + netpol := &networkingv1.NetworkPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Name: "allow-from-bar-to-foo", + Namespace: "default", + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{MatchLabels: map[string]string{"run": "foo"}}, + Ingress: []networkingv1.NetworkPolicyIngressRule{{ + From: []networkingv1.NetworkPolicyPeer{{ + PodSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"run": "bar"}, + }, + }}, + }}, + }, + } + controller.AddNetworkPolicy(netpol) + + // Should remove the foo pod from default-allow as the netpol selects it + require.False(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + require.True(t, m.Exist(defaultAllowIPSetName, barPodIP)) + + podBarWithNewIP := *podBar + podBarWithNewIP.Status.PodIP = barPodNewIP + controller.UpdatePod(podBar, &podBarWithNewIP) + + // Should update IP addr of the bar pod in default-allow + require.False(t, m.Exist(defaultAllowIPSetName, barPodIP)) + require.True(t, m.Exist(defaultAllowIPSetName, barPodNewIP)) + + controller.UpdatePod(&podBarWithNewIP, podBarNoIP) + // Should remove the bar pod from default-allow as it does not have any IP addr + require.False(t, m.Exist(defaultAllowIPSetName, barPodNewIP)) + + podFooWithNewLabel := *podFoo + podFooWithNewLabel.ObjectMeta.Labels = map[string]string{"run": "new-foo"} + controller.UpdatePod(podFoo, &podFooWithNewLabel) + + // Should bring back the foo pod to default-allow as it does not match dst of any netpol + require.True(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + + controller.UpdatePod(&podFooWithNewLabel, podFoo) + // Should remove from default-allow as it matches the netpol after the update + require.False(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + + controller.DeleteNetworkPolicy(netpol) + // Should bring back the foo pod to default-allow as no netpol selects it + require.True(t, m.Exist(defaultAllowIPSetName, fooPodIP)) + + controller.DeletePod(podFoo) + // Should remove foo pod to default-allow + require.False(t, m.Exist(defaultAllowIPSetName, fooPodIP)) +} diff --git a/npc/ipset/ipset.go b/npc/ipset/ipset.go index e1d0d6f7b5..2a9b0d4e62 100644 --- a/npc/ipset/ipset.go +++ b/npc/ipset/ipset.go @@ -20,7 +20,10 @@ const ( type Interface interface { Create(ipsetName Name, ipsetType Type) error AddEntry(ipsetName Name, entry string, comment string) error + AddEntryIfNotExist(ipsetName Name, entry string, comment string) error DelEntry(ipsetName Name, entry string) error + DelEntryIfExists(ipsetName Name, entry string) error + Exist(ipsetName Name, entry string) bool Flush(ipsetName Name) error Destroy(ipsetName Name) error @@ -74,6 +77,15 @@ func (i *ipset) AddEntry(ipsetName Name, entry string, comment string) error { return doExec(args...) } +// AddEntryIfNotExist does the same as AddEntry but bypasses the ref counting. +// Should be used only with "default-allow" ipsets. +func (i *ipset) AddEntryIfNotExist(ipsetName Name, entry string, comment string) error { + if i.count(ipsetName, entry) == 1 { + return nil + } + return i.AddEntry(ipsetName, entry, comment) +} + func (i *ipset) DelEntry(ipsetName Name, entry string) error { i.Logger.Printf("deleting entry %s from %s", entry, ipsetName) if i.dec(ipsetName, entry) > 0 { // still needed @@ -82,6 +94,19 @@ func (i *ipset) DelEntry(ipsetName Name, entry string) error { return doExec("del", string(ipsetName), entry) } +// DelEntryIfExists does the same as DelEntry but bypasses the ref counting. +// Should be used only with "default-allow" ipsets. +func (i *ipset) DelEntryIfExists(ipsetName Name, entry string) error { + if i.count(ipsetName, entry) == 0 { + return nil + } + return i.DelEntry(ipsetName, entry) +} + +func (i *ipset) Exist(ipsetName Name, entry string) bool { + return i.count(ipsetName, entry) > 0 +} + func (i *ipset) Flush(ipsetName Name) error { i.removeSet(ipsetName) return doExec("flush", string(ipsetName)) @@ -154,6 +179,10 @@ func (rc *refCount) dec(ipsetName Name, entry string) int { return rc.ref[k] } +func (rc *refCount) count(ipsetName Name, entry string) int { + return rc.ref[key{ipsetName, entry}] +} + func (rc *refCount) removeSet(ipsetName Name) { for k := range rc.ref { if k.ipsetName == ipsetName { diff --git a/npc/namespace.go b/npc/namespace.go index 6caf712b2b..85c893e09b 100644 --- a/npc/namespace.go +++ b/npc/namespace.go @@ -2,10 +2,12 @@ package npc import ( "encoding/json" + "errors" "fmt" coreapi "k8s.io/api/core/v1" extnapi "k8s.io/api/extensions/v1beta1" + networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/uuid" @@ -15,26 +17,35 @@ import ( "github.com/weaveworks/weave/npc/iptables" ) +var errInvalidNetworkPolicyObjType = errors.New("invalid NetworkPolicy object type") + type ns struct { ipt iptables.Interface // interface to iptables ips ipset.Interface // interface to ipset - name string // k8s Namespace name - nodeName string // my node name - namespace *coreapi.Namespace // k8s Namespace object - pods map[types.UID]*coreapi.Pod // k8s Pod objects by UID - policies map[types.UID]*extnapi.NetworkPolicy // k8s NetworkPolicy objects by UID + name string // k8s Namespace name + nodeName string // my node name + namespace *coreapi.Namespace // k8s Namespace object + pods map[types.UID]*coreapi.Pod // k8s Pod objects by UID + policies map[types.UID]interface{} // k8s NetworkPolicy objects by UID uid types.UID // surrogate UID to own allPods selector allPods *selectorSpec // hash:ip ipset of all pod IPs in this namespace + // stores IP addrs of pods which are not selected by any dst podSelector of + // any netpol; used only in non-legacy mode and is used as a dst in + // the WEAVE-NPC-DEFAULT iptables chain. + defaultAllowIPSet ipset.Name + nsSelectors *selectorSet podSelectors *selectorSet rules *ruleSet + + legacy bool } -func newNS(name, nodeName string, ipt iptables.Interface, ips ipset.Interface, nsSelectors *selectorSet) (*ns, error) { - allPods, err := newSelectorSpec(&metav1.LabelSelector{}, name, ipset.HashIP) +func newNS(name, nodeName string, legacy bool, ipt iptables.Interface, ips ipset.Interface, nsSelectors *selectorSet) (*ns, error) { + allPods, err := newSelectorSpec(&metav1.LabelSelector{}, false, name, ipset.HashIP) if err != nil { return nil, err } @@ -45,13 +56,22 @@ func newNS(name, nodeName string, ipt iptables.Interface, ips ipset.Interface, n name: name, nodeName: nodeName, pods: make(map[types.UID]*coreapi.Pod), - policies: make(map[types.UID]*extnapi.NetworkPolicy), + policies: make(map[types.UID]interface{}), uid: uuid.NewUUID(), allPods: allPods, nsSelectors: nsSelectors, - rules: newRuleSet(ipt)} + rules: newRuleSet(ipt), + legacy: legacy} - ns.podSelectors = newSelectorSet(ips, ns.onNewPodSelector) + ns.podSelectors = newSelectorSet(ips, ns.onNewPodSelector, ns.onNewDstPodSelector, ns.onDestroyDstPodSelector) + + if !legacy { + defaultAllowIPSet := ipset.Name(IpsetNamePrefix + shortName("default-allow:"+name)) + if err := ips.Create(defaultAllowIPSet, ipset.HashIP); err != nil { + return nil, err + } + ns.defaultAllowIPSet = defaultAllowIPSet + } if err := ns.podSelectors.provision(ns.uid, nil, map[string]*selectorSpec{ns.allPods.key: ns.allPods}); err != nil { return nil, err @@ -75,9 +95,65 @@ func (ns *ns) onNewPodSelector(selector *selector) error { if err := selector.addEntry(pod.Status.PodIP, podComment(pod)); err != nil { return err } + + } + } + } + return nil +} + +func (ns *ns) onNewDstPodSelector(selector *selector) error { + if ns.legacy { + return nil + } + + for _, pod := range ns.pods { + if hasIP(pod) { + // Remove the pod from default-allow if dst podselector matches the pod + if selector.matches(pod.ObjectMeta.Labels) { + if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, pod.Status.PodIP); err != nil { + return err + } + } + } + } + + return nil +} + +func (ns *ns) onDestroyDstPodSelector(selector *selector) error { + if ns.legacy { + return nil + } + + for _, pod := range ns.pods { + if hasIP(pod) { + if selector.matches(pod.ObjectMeta.Labels) { + if err := ns.addToDefaultAllowIfNoMatching(pod); err != nil { + return err + } } } } + + return nil +} + +// Add pod IP addr to default-allow ipset if there are no matching dst selectors +func (ns *ns) addToDefaultAllowIfNoMatching(pod *coreapi.Pod) error { + found := false + // TODO(mp) optimize (avoid iterating over selectors) by ref counting IP addrs. + for _, s := range ns.podSelectors.entries { + if ns.podSelectors.dstSelectorExist(s) && s.matches(pod.ObjectMeta.Labels) { + found = true + break + } + } + if !found { + if err := ns.ips.AddEntryIfNotExist(ns.defaultAllowIPSet, pod.Status.PodIP, podComment(pod)); err != nil { + return err + } + } return nil } @@ -98,7 +174,19 @@ func (ns *ns) addPod(obj *coreapi.Pod) error { if ns.checkLocalPod(obj) { ns.ips.AddEntry(LocalIpset, obj.Status.PodIP, podComment(obj)) } - return ns.podSelectors.addToMatching(obj.ObjectMeta.Labels, obj.Status.PodIP, podComment(obj)) + + found, err := ns.podSelectors.addToMatching(obj.ObjectMeta.Labels, obj.Status.PodIP, podComment(obj)) + if err != nil { + return err + } + // If there are no matching dst selectors, add the pod to default-allow + if !ns.legacy && !found { + if err := ns.ips.AddEntryIfNotExist(ns.defaultAllowIPSet, obj.Status.PodIP, podComment(obj)); err != nil { + return err + } + } + + return nil } func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { @@ -113,6 +201,13 @@ func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { if ns.checkLocalPod(oldObj) { ns.ips.DelEntry(LocalIpset, oldObj.Status.PodIP) } + + if !ns.legacy { + if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { + return err + } + } + return ns.podSelectors.delFromMatching(oldObj.ObjectMeta.Labels, oldObj.Status.PodIP) } @@ -120,7 +215,30 @@ func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { if ns.checkLocalPod(newObj) { ns.ips.AddEntry(LocalIpset, newObj.Status.PodIP, podComment(newObj)) } - return ns.podSelectors.addToMatching(newObj.ObjectMeta.Labels, newObj.Status.PodIP, podComment(newObj)) + found, err := ns.podSelectors.addToMatching(newObj.ObjectMeta.Labels, newObj.Status.PodIP, podComment(newObj)) + if err != nil { + return err + } + + if !ns.legacy && !found { + if err := ns.ips.AddEntryIfNotExist(ns.defaultAllowIPSet, newObj.Status.PodIP, podComment(newObj)); err != nil { + return err + } + } + + return nil + } + + if !ns.legacy && + oldObj.Status.PodIP != newObj.Status.PodIP && + ns.ips.Exist(ns.defaultAllowIPSet, oldObj.Status.PodIP) { + + if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { + return err + } + if err := ns.ips.AddEntryIfNotExist(ns.defaultAllowIPSet, newObj.Status.PodIP, podComment(newObj)); err != nil { + return err + } } if !equals(oldObj.ObjectMeta.Labels, newObj.ObjectMeta.Labels) || @@ -142,6 +260,19 @@ func (ns *ns) updatePod(oldObj, newObj *coreapi.Pod) error { return err } } + + if !ns.legacy && ns.podSelectors.dstSelectorExist(ps) { + switch { + case !oldMatch && newMatch: + if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, oldObj.Status.PodIP); err != nil { + return err + } + case oldMatch && !newMatch: + if err := ns.addToDefaultAllowIfNoMatching(newObj); err != nil { + return err + } + } + } } } @@ -158,124 +289,145 @@ func (ns *ns) deletePod(obj *coreapi.Pod) error { if ns.checkLocalPod(obj) { ns.ips.DelEntry(LocalIpset, obj.Status.PodIP) } + + if !ns.legacy { + if err := ns.ips.DelEntryIfExists(ns.defaultAllowIPSet, obj.Status.PodIP); err != nil { + return err + } + } + return ns.podSelectors.delFromMatching(obj.ObjectMeta.Labels, obj.Status.PodIP) } -func (ns *ns) addNetworkPolicy(obj *extnapi.NetworkPolicy) error { - ns.policies[obj.ObjectMeta.UID] = obj - +func (ns *ns) addNetworkPolicy(obj interface{}) error { // Analyse policy, determine which rules and ipsets are required - rules, nsSelectors, podSelectors, err := ns.analysePolicy(obj) + + uid, rules, nsSelectors, podSelectors, err := ns.analyse(obj) if err != nil { return err } // Provision required resources in dependency order - if err := ns.nsSelectors.provision(obj.ObjectMeta.UID, nil, nsSelectors); err != nil { + if err := ns.nsSelectors.provision(uid, nil, nsSelectors); err != nil { return err } - if err := ns.podSelectors.provision(obj.ObjectMeta.UID, nil, podSelectors); err != nil { + if err := ns.podSelectors.provision(uid, nil, podSelectors); err != nil { return err } - return ns.rules.provision(obj.ObjectMeta.UID, nil, rules) + return ns.rules.provision(uid, nil, rules) } -func (ns *ns) updateNetworkPolicy(oldObj, newObj *extnapi.NetworkPolicy) error { - delete(ns.policies, oldObj.ObjectMeta.UID) - ns.policies[newObj.ObjectMeta.UID] = newObj - +func (ns *ns) updateNetworkPolicy(oldObj, newObj interface{}) error { // Analyse the old and the new policy so we can determine differences - oldRules, oldNsSelectors, oldPodSelectors, err := ns.analysePolicy(oldObj) + oldUID, oldRules, oldNsSelectors, oldPodSelectors, err := ns.analyse(oldObj) if err != nil { return err } - newRules, newNsSelectors, newPodSelectors, err := ns.analysePolicy(newObj) + newUID, newRules, newNsSelectors, newPodSelectors, err := ns.analyse(newObj) if err != nil { return err } + delete(ns.policies, oldUID) + ns.policies[newUID] = newObj + // Deprovision unused and provision newly required resources in dependency order - if err := ns.rules.deprovision(oldObj.ObjectMeta.UID, oldRules, newRules); err != nil { + if err := ns.rules.deprovision(oldUID, oldRules, newRules); err != nil { return err } - if err := ns.nsSelectors.deprovision(oldObj.ObjectMeta.UID, oldNsSelectors, newNsSelectors); err != nil { + if err := ns.nsSelectors.deprovision(oldUID, oldNsSelectors, newNsSelectors); err != nil { return err } - if err := ns.podSelectors.deprovision(oldObj.ObjectMeta.UID, oldPodSelectors, newPodSelectors); err != nil { + if err := ns.podSelectors.deprovision(oldUID, oldPodSelectors, newPodSelectors); err != nil { return err } - if err := ns.nsSelectors.provision(oldObj.ObjectMeta.UID, oldNsSelectors, newNsSelectors); err != nil { + if err := ns.nsSelectors.provision(oldUID, oldNsSelectors, newNsSelectors); err != nil { return err } - if err := ns.podSelectors.provision(oldObj.ObjectMeta.UID, oldPodSelectors, newPodSelectors); err != nil { + if err := ns.podSelectors.provision(oldUID, oldPodSelectors, newPodSelectors); err != nil { return err } - return ns.rules.provision(oldObj.ObjectMeta.UID, oldRules, newRules) + return ns.rules.provision(oldUID, oldRules, newRules) } -func (ns *ns) deleteNetworkPolicy(obj *extnapi.NetworkPolicy) error { - delete(ns.policies, obj.ObjectMeta.UID) - +func (ns *ns) deleteNetworkPolicy(obj interface{}) error { // Analyse network policy to free resources - rules, nsSelectors, podSelectors, err := ns.analysePolicy(obj) + uid, rules, nsSelectors, podSelectors, err := ns.analyse(obj) if err != nil { return err } + delete(ns.policies, uid) + // Deprovision unused resources in dependency order - if err := ns.rules.deprovision(obj.ObjectMeta.UID, rules, nil); err != nil { + if err := ns.rules.deprovision(uid, rules, nil); err != nil { return err } - if err := ns.nsSelectors.deprovision(obj.ObjectMeta.UID, nsSelectors, nil); err != nil { + if err := ns.nsSelectors.deprovision(uid, nsSelectors, nil); err != nil { return err } - return ns.podSelectors.deprovision(obj.ObjectMeta.UID, podSelectors, nil) + return ns.podSelectors.deprovision(uid, podSelectors, nil) } func bypassRule(nsIpsetName ipset.Name, namespace string) []string { return []string{"-m", "set", "--match-set", string(nsIpsetName), "dst", "-j", "ACCEPT", "-m", "comment", "--comment", "DefaultAllow isolation for namespace: " + namespace} } -func (ns *ns) ensureBypassRule(nsIpsetName ipset.Name) error { - common.Log.Debugf("ensuring rule for DefaultAllow in namespace: %s, set %s", ns.name, nsIpsetName) - return ns.ipt.Append(TableFilter, DefaultChain, bypassRule(ns.allPods.ipsetName, ns.name)...) +func (ns *ns) ensureBypassRule() error { + var ipset ipset.Name + if ns.legacy { + ipset = ns.allPods.ipsetName + } else { + ipset = ns.defaultAllowIPSet + } + + common.Log.Debugf("ensuring rule for DefaultAllow in namespace: %s, set %s", ns.name, ipset) + return ns.ipt.Append(TableFilter, DefaultChain, bypassRule(ipset, ns.name)...) } -func (ns *ns) deleteBypassRule(nsIpsetName ipset.Name) error { - common.Log.Debugf("removing default rule in namespace: %s, set %s", ns.name, nsIpsetName) - return ns.ipt.Delete(TableFilter, DefaultChain, bypassRule(ns.allPods.ipsetName, ns.name)...) +func (ns *ns) deleteBypassRule() error { + var ipset ipset.Name + if ns.legacy { + ipset = ns.allPods.ipsetName + } else { + ipset = ns.defaultAllowIPSet + } + + common.Log.Debugf("removing default rule in namespace: %s, set %s", ns.name, ipset) + return ns.ipt.Delete(TableFilter, DefaultChain, bypassRule(ipset, ns.name)...) } func (ns *ns) addNamespace(obj *coreapi.Namespace) error { ns.namespace = obj // Insert a rule to bypass policies if namespace is DefaultAllow - if !isDefaultDeny(obj) { - if err := ns.ensureBypassRule(ns.allPods.ipsetName); err != nil { + if !ns.isDefaultDeny(obj) { + if err := ns.ensureBypassRule(); err != nil { return err } } // Add namespace ipset to matching namespace selectors - return ns.nsSelectors.addToMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName), namespaceComment(ns)) + _, err := ns.nsSelectors.addToMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName), namespaceComment(ns)) + return err } func (ns *ns) updateNamespace(oldObj, newObj *coreapi.Namespace) error { ns.namespace = newObj // Update bypass rule if ingress default has changed - oldDefaultDeny := isDefaultDeny(oldObj) - newDefaultDeny := isDefaultDeny(newObj) + oldDefaultDeny := ns.isDefaultDeny(oldObj) + newDefaultDeny := ns.isDefaultDeny(newObj) if oldDefaultDeny != newDefaultDeny { common.Log.Infof("namespace DefaultDeny changed from %t to %t", oldDefaultDeny, newDefaultDeny) if oldDefaultDeny { - if err := ns.ensureBypassRule(ns.allPods.ipsetName); err != nil { + if err := ns.ensureBypassRule(); err != nil { return err } } if newDefaultDeny { - if err := ns.deleteBypassRule(ns.allPods.ipsetName); err != nil { + if err := ns.deleteBypassRule(); err != nil { return err } } @@ -309,8 +461,8 @@ func (ns *ns) deleteNamespace(obj *coreapi.Namespace) error { ns.namespace = nil // Remove bypass rule - if !isDefaultDeny(obj) { - if err := ns.deleteBypassRule(ns.allPods.ipsetName); err != nil { + if !ns.isDefaultDeny(obj) { + if err := ns.deleteBypassRule(); err != nil { return err } } @@ -319,27 +471,14 @@ func (ns *ns) deleteNamespace(obj *coreapi.Namespace) error { return ns.nsSelectors.delFromMatching(obj.ObjectMeta.Labels, string(ns.allPods.ipsetName)) } -func hasIP(pod *coreapi.Pod) bool { - // Ensure pod isn't dead, has an IP address and isn't sharing the host network namespace - return pod.Status.Phase != "Succeeded" && pod.Status.Phase != "Failed" && - len(pod.Status.PodIP) > 0 && !pod.Spec.HostNetwork -} - -func equals(a, b map[string]string) bool { - if len(a) != len(b) { +func (ns *ns) isDefaultDeny(namespace *coreapi.Namespace) bool { + nnpJSON, found := namespace.ObjectMeta.Annotations["net.beta.kubernetes.io/network-policy"] + if !found { return false } - for ak, av := range a { - if b[ak] != av { - return false - } - } - return true -} -func isDefaultDeny(namespace *coreapi.Namespace) bool { - nnpJSON, found := namespace.ObjectMeta.Annotations["net.beta.kubernetes.io/network-policy"] - if !found { + if !ns.legacy { + common.Log.Warn("DefaultDeny annotation is supported only in legacy mode (--use-legacy-netpol)") return false } @@ -355,6 +494,24 @@ func isDefaultDeny(namespace *coreapi.Namespace) bool { *(nnp.Ingress.Isolation) == DefaultDeny } +func hasIP(pod *coreapi.Pod) bool { + // Ensure pod isn't dead, has an IP address and isn't sharing the host network namespace + return pod.Status.Phase != "Succeeded" && pod.Status.Phase != "Failed" && + len(pod.Status.PodIP) > 0 && !pod.Spec.HostNetwork +} + +func equals(a, b map[string]string) bool { + if len(a) != len(b) { + return false + } + for ak, av := range a { + if b[ak] != av { + return false + } + } + return true +} + func namespaceComment(namespace *ns) string { return "namespace: " + namespace.name } @@ -362,3 +519,36 @@ func namespaceComment(namespace *ns) string { func podComment(pod *coreapi.Pod) string { return fmt.Sprintf("namespace: %s, pod: %s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) } + +func (ns *ns) analyse(obj interface{}) ( + uid types.UID, + rules map[string]*ruleSpec, + nsSelectors, podSelectors map[string]*selectorSpec, + err error) { + + switch p := obj.(type) { + case *extnapi.NetworkPolicy: + uid = p.ObjectMeta.UID + case *networkingv1.NetworkPolicy: + uid = p.ObjectMeta.UID + default: + err = errInvalidNetworkPolicyObjType + return + } + ns.policies[uid] = obj + + // Analyse policy, determine which rules and ipsets are required + if ns.legacy { + rules, nsSelectors, podSelectors, err = ns.analysePolicyLegacy(obj.(*extnapi.NetworkPolicy)) + if err != nil { + return + } + } else { + rules, nsSelectors, podSelectors, err = ns.analysePolicy(obj.(*networkingv1.NetworkPolicy)) + if err != nil { + return + } + } + + return +} diff --git a/npc/selector.go b/npc/selector.go index 9896096b1b..bb0d0cb5af 100644 --- a/npc/selector.go +++ b/npc/selector.go @@ -12,13 +12,14 @@ import ( type selectorSpec struct { key string // string representation (for hash keying/equality comparison) selector labels.Selector // k8s Selector object (for matching) + dst bool // destination selector ipsetType ipset.Type // type of ipset to provision ipsetName ipset.Name // generated ipset name nsName string // Namespace name } -func newSelectorSpec(json *metav1.LabelSelector, nsName string, ipsetType ipset.Type) (*selectorSpec, error) { +func newSelectorSpec(json *metav1.LabelSelector, dst bool, nsName string, ipsetType ipset.Type) (*selectorSpec, error) { selector, err := metav1.LabelSelectorAsSelector(json) if err != nil { return nil, err @@ -27,6 +28,7 @@ func newSelectorSpec(json *metav1.LabelSelector, nsName string, ipsetType ipset. return &selectorSpec{ key: key, selector: selector, + dst: dst, // We prefix the selector string with the namespace name when generating // the shortname because you can specify the same selector in multiple // namespaces - we need those to map to distinct ipsets @@ -57,27 +59,45 @@ type selectorFn func(selector *selector) error type selectorSet struct { ips ipset.Interface onNewSelector selectorFn - users map[string]map[types.UID]struct{} // list of users per selector - entries map[string]*selector + + // invoked after dst selector has been provisioned for the first time + onNewDstSelector selectorFn + // invoked after the last instance of dst selector has been deprovisioned + onDestroyDstSelector selectorFn + + users map[string]map[types.UID]struct{} // list of users per selector + entries map[string]*selector + + // We need to keep track of dst selector instances to be able to invoke + // onNewDstSelector and onDestroyDstSelector callbacks at the right time; + // selectorSpec.Key -> count + dstSelectorsCount map[string]int } -func newSelectorSet(ips ipset.Interface, onNewSelector selectorFn) *selectorSet { +func newSelectorSet(ips ipset.Interface, onNewSelector, onNewDstSelector selectorFn, onDestroyDstSelector selectorFn) *selectorSet { return &selectorSet{ - ips: ips, - onNewSelector: onNewSelector, - users: make(map[string]map[types.UID]struct{}), - entries: make(map[string]*selector)} + ips: ips, + onNewSelector: onNewSelector, + onNewDstSelector: onNewDstSelector, + onDestroyDstSelector: onDestroyDstSelector, + users: make(map[string]map[types.UID]struct{}), + entries: make(map[string]*selector), + dstSelectorsCount: make(map[string]int)} } -func (ss *selectorSet) addToMatching(labelMap map[string]string, entry string, comment string) error { +func (ss *selectorSet) addToMatching(labelMap map[string]string, entry string, comment string) (bool, error) { + found := false for _, s := range ss.entries { if s.matches(labelMap) { + if ss.dstSelectorExist(s) { + found = true + } if err := s.addEntry(entry, comment); err != nil { - return err + return found, err } } } - return nil + return found, nil } func (ss *selectorSet) delFromMatching(labelMap map[string]string, entry string) error { @@ -91,6 +111,10 @@ func (ss *selectorSet) delFromMatching(labelMap map[string]string, entry string) return nil } +func (ss *selectorSet) dstSelectorExist(s *selector) bool { + return ss.dstSelectorsCount[s.spec.key] > 0 +} + func (ss *selectorSet) deprovision(user types.UID, current, desired map[string]*selectorSpec) error { for key, spec := range current { if _, found := desired[key]; !found { @@ -100,9 +124,19 @@ func (ss *selectorSet) deprovision(user types.UID, current, desired map[string]* if err := ss.ips.Destroy(spec.ipsetName); err != nil { return err } + delete(ss.entries, key) delete(ss.users, key) } + + if spec.dst { + ss.dstSelectorsCount[key]-- + if ss.dstSelectorsCount[key] == 0 { + if err := ss.onDestroyDstSelector(&selector{ss.ips, spec}); err != nil { + return err + } + } + } } } return nil @@ -111,12 +145,13 @@ func (ss *selectorSet) deprovision(user types.UID, current, desired map[string]* func (ss *selectorSet) provision(user types.UID, current, desired map[string]*selectorSpec) error { for key, spec := range desired { if _, found := current[key]; !found { + selector := &selector{ss.ips, spec} + if _, found := ss.users[key]; !found { common.Log.Infof("creating ipset: %#v", spec) if err := ss.ips.Create(spec.ipsetName, spec.ipsetType); err != nil { return err } - selector := &selector{ss.ips, spec} if err := ss.onNewSelector(selector); err != nil { return err } @@ -124,6 +159,15 @@ func (ss *selectorSet) provision(user types.UID, current, desired map[string]*se ss.entries[key] = selector } ss.users[key][user] = struct{}{} + + if spec.dst { + ss.dstSelectorsCount[key]++ + if ss.dstSelectorsCount[key] == 1 { + if err := ss.onNewDstSelector(selector); err != nil { + return err + } + } + } } } return nil diff --git a/prog/weave-kube/weave-daemonset-k8s-1.7.yaml b/prog/weave-kube/weave-daemonset-k8s-1.7.yaml index dad670b3be..724cefa6de 100644 --- a/prog/weave-kube/weave-daemonset-k8s-1.7.yaml +++ b/prog/weave-kube/weave-daemonset-k8s-1.7.yaml @@ -33,6 +33,14 @@ items: - get - list - watch + - apiGroups: + - 'networking.k8s.io' + resources: + - networkpolicies + verbs: + - get + - list + - watch - apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding metadata: diff --git a/prog/weave-npc/main.go b/prog/weave-npc/main.go index eff0ef0715..01b5266c89 100644 --- a/prog/weave-npc/main.go +++ b/prog/weave-npc/main.go @@ -9,6 +9,7 @@ import ( "github.com/spf13/cobra" coreapi "k8s.io/api/core/v1" extnapi "k8s.io/api/extensions/v1beta1" + networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -29,6 +30,7 @@ var ( logLevel string allowMcast bool nodeName string + legacy bool ) func handleError(err error) { common.CheckFatal(err) } @@ -117,6 +119,8 @@ func createBaseRules(ipt *iptables.IPTables, ips ipset.Interface) error { } func root(cmd *cobra.Command, args []string) { + var npController cache.Controller + common.SetLogLevel(logLevel) if nodeName == "" { // HOSTNAME is set by Kubernetes for pods in the host network namespace @@ -127,6 +131,10 @@ func root(cmd *cobra.Command, args []string) { } common.Log.Infof("Starting Weaveworks NPC %s; node name %q", version, nodeName) + if legacy { + common.Log.Info("Running in legacy mode (k8s pre-1.7 network policy semantics)") + } + if err := metrics.Start(metricsAddr); err != nil { common.Log.Fatalf("Failed to start metrics: %v", err) } @@ -150,7 +158,7 @@ func root(cmd *cobra.Command, args []string) { handleError(resetIPSets(ips)) handleError(createBaseRules(ipt, ips)) - npc := npc.New(nodeName, ipt, ips) + npc := npc.New(nodeName, legacy, ipt, ips) nsController := makeController(client.Core().RESTClient(), "namespaces", &coreapi.Namespace{}, cache.ResourceEventHandlerFuncs{ @@ -192,25 +200,30 @@ func root(cmd *cobra.Command, args []string) { handleError(npc.UpdatePod(old.(*coreapi.Pod), new.(*coreapi.Pod))) }}) - npController := makeController(client.Extensions().RESTClient(), "networkpolicies", &extnapi.NetworkPolicy{}, - cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - handleError(npc.AddNetworkPolicy(obj.(*extnapi.NetworkPolicy))) - }, - DeleteFunc: func(obj interface{}) { - switch obj := obj.(type) { - case *extnapi.NetworkPolicy: - handleError(npc.DeleteNetworkPolicy(obj)) - case cache.DeletedFinalStateUnknown: - // We know this object has gone away, but its final state is no longer - // available from the API server. Instead we use the last copy of it - // that we have, which is good enough for our cleanup. - handleError(npc.DeleteNetworkPolicy(obj.Obj.(*extnapi.NetworkPolicy))) - } - }, - UpdateFunc: func(old, new interface{}) { - handleError(npc.UpdateNetworkPolicy(old.(*extnapi.NetworkPolicy), new.(*extnapi.NetworkPolicy))) - }}) + npHandlers := cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + handleError(npc.AddNetworkPolicy(obj)) + }, + DeleteFunc: func(obj interface{}) { + switch obj := obj.(type) { + case cache.DeletedFinalStateUnknown: + // We know this object has gone away, but its final state is no longer + // available from the API server. Instead we use the last copy of it + // that we have, which is good enough for our cleanup. + handleError(npc.DeleteNetworkPolicy(obj.Obj)) + default: + handleError(npc.DeleteNetworkPolicy(obj)) + } + }, + UpdateFunc: func(old, new interface{}) { + handleError(npc.UpdateNetworkPolicy(old, new)) + }, + } + if legacy { + npController = makeController(client.Extensions().RESTClient(), "networkpolicies", &extnapi.NetworkPolicy{}, npHandlers) + } else { + npController = makeController(client.NetworkingV1().RESTClient(), "networkpolicies", &networkingv1.NetworkPolicy{}, npHandlers) + } go nsController.Run(wait.NeverStop) go podController.Run(wait.NeverStop) @@ -231,6 +244,7 @@ func main() { rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "debug", "logging level (debug, info, warning, error)") rootCmd.PersistentFlags().BoolVar(&allowMcast, "allow-mcast", true, "allow all multicast traffic") rootCmd.PersistentFlags().StringVar(&nodeName, "node-name", "", "only generate rules that apply to this node") + rootCmd.PersistentFlags().BoolVar(&legacy, "use-legacy-netpol", false, "use legacy network policies (pre k8s 1.7 vsn)") handleError(rootCmd.Execute()) } diff --git a/test/840_weave_kube_3_test.sh b/test/840_weave_kube_3_test.sh index c5a025958a..4f6e56c954 100755 --- a/test/840_weave_kube_3_test.sh +++ b/test/840_weave_kube_3_test.sh @@ -19,6 +19,7 @@ SUCCESS="$(( $NUM_HOSTS * ($NUM_HOSTS-1) )) established" KUBECTL="sudo kubectl --kubeconfig /etc/kubernetes/admin.conf" KUBE_PORT=6443 IMAGE=weaveworks/network-tester:latest +DOMAIN=nettest.default.svc.cluster.local. tear_down_kubeadm @@ -47,6 +48,7 @@ fi # Ensure Kubernetes uses locally built container images and inject code coverage environment variable (or do nothing depending on $COVERAGE): sed -e "s%imagePullPolicy: Always%imagePullPolicy: Never%" \ -e "s%env:%$COVERAGE_ARGS%" \ + -e "s%#npc-args% args:\n - '--use-legacy-netpol'%" \ "$(dirname "$0")/../prog/weave-kube/weave-daemonset-k8s-1.7.yaml" | run_on "$HOST1" "$KUBECTL apply -n kube-system -f -" sleep 5 @@ -115,7 +117,7 @@ check_ready() { assert_raises 'wait_for_x check_ready "hosts to be ready"' # See if we can get some pods running that connect to the network -run_on $HOST1 "$KUBECTL run --image-pull-policy=Never nettest --image=$IMAGE --replicas=3 -- -peers=3 -dns-name=nettest.default.svc.cluster.local." +run_on $HOST1 "$KUBECTL run --image-pull-policy=Never nettest --image=$IMAGE --replicas=3 -- -peers=3 -dns-name=$DOMAIN" # Create a headless service so they can be found in Kubernetes DNS run_on $HOST1 "$KUBECTL create -f -" <