Skip to content

Commit

Permalink
Update NetworkPolicy code for v1 semantics
Browse files Browse the repository at this point in the history
  • Loading branch information
danwinship committed Jun 9, 2017
1 parent 943154e commit 8be5204
Showing 1 changed file with 43 additions and 71 deletions.
114 changes: 43 additions & 71 deletions pkg/sdn/plugin/networkpolicy.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package plugin

import (
"encoding/json"
"fmt"
"reflect"
"sort"
Expand All @@ -15,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -44,11 +44,10 @@ type networkPolicyPlugin struct {

// npNamespace tracks NetworkPolicy-related data for a Namespace
type npNamespace struct {
name string
vnid uint32
isolated bool
refs int
inUse bool
name string
vnid uint32
refs int
inUse bool

policies map[ktypes.UID]*npPolicy
}
Expand All @@ -59,7 +58,8 @@ type npPolicy struct {
watchesNamespaces bool
watchesPods bool

flows []string
flows []string
selectedIPs []string
}

func NewNetworkPolicyPlugin() osdnPolicy {
Expand Down Expand Up @@ -120,7 +120,6 @@ func (np *networkPolicyPlugin) initNamespaces() error {
np.namespaces[vnid] = &npNamespace{
name: ns.Name,
vnid: vnid,
isolated: namespaceIsIsolated(&ns),
refs: 0,
policies: make(map[ktypes.UID]*npPolicy),
}
Expand Down Expand Up @@ -155,15 +154,9 @@ func (np *networkPolicyPlugin) AddNetNamespace(netns *osapi.NetNamespace) {
return
}

isolated := false
if kns, exists := np.kNamespaces[netns.NetName]; exists {
isolated = namespaceIsIsolated(&kns)
}

np.namespaces[netns.NetID] = &npNamespace{
name: netns.NetName,
vnid: netns.NetID,
isolated: isolated,
refs: 0,
policies: make(map[ktypes.UID]*npPolicy),
}
Expand Down Expand Up @@ -206,14 +199,39 @@ func (np *networkPolicyPlugin) syncNamespace(npns *npNamespace) {
otx := np.node.oc.NewTransaction()
otx.DeleteFlows("table=80, reg1=%d", npns.vnid)
if inUse {
if npns.isolated {
allPodsSelected := false

// Add "allow" rules for all traffic allowed by a NetworkPolicy
for _, npp := range npns.policies {
for _, flow := range npp.flows {
otx.AddFlow("table=80, priority=150, reg1=%d, %s actions=output:NXM_NX_REG2[]", npns.vnid, flow)
}
if npp.selectedIPs == nil {
allPodsSelected = true
}
}

if allPodsSelected {
// Some policy selects all pods, so all pods are "isolated" and no
// traffic is allowed beyond what we explicitly allowed above. (And
// the "priority=0, actions=drop" rule will filter out all remaining
// traffic in this Namespace).
} else {
// No policy selects all pods, so we need an "else accept" rule to
// allow traffic to pod IPs that aren't selected by a policy. But
// before that we need rules to drop any remaining traffic for any pod
// IP that *is* selected by a policy.
selectedIPs := sets.NewString()
for _, npp := range npns.policies {
for _, flow := range npp.flows {
otx.AddFlow("table=80, priority=100, reg1=%d, %s actions=output:NXM_NX_REG2[]", npns.vnid, flow)
for _, ip := range npp.selectedIPs {
if !selectedIPs.Has(ip) {
selectedIPs.Insert(ip)
otx.AddFlow("table=80, priority=100, reg1=%d, ip, nw_dst=%s, actions=drop", npns.vnid, ip)
}
}
}
} else {
otx.AddFlow("table=80, priority=100, reg1=%d, actions=output:NXM_NX_REG2[]", npns.vnid)

otx.AddFlow("table=80, priority=50, reg1=%d, actions=output:NXM_NX_REG2[]", npns.vnid)
}
}
if err := otx.EndTransaction(); err != nil {
Expand Down Expand Up @@ -292,10 +310,12 @@ func (np *networkPolicyPlugin) parseNetworkPolicy(npns *npNamespace, policy *ext
var destFlows []string
if len(policy.Spec.PodSelector.MatchLabels) > 0 || len(policy.Spec.PodSelector.MatchExpressions) > 0 {
npp.watchesPods = true
for _, ip := range np.selectPods(npns, &policy.Spec.PodSelector) {
npp.selectedIPs = np.selectPods(npns, &policy.Spec.PodSelector)
for _, ip := range npp.selectedIPs {
destFlows = append(destFlows, fmt.Sprintf("ip, nw_dst=%s, ", ip))
}
} else {
npp.selectedIPs = nil
destFlows = []string{""}
}

Expand Down Expand Up @@ -419,40 +439,6 @@ func (np *networkPolicyPlugin) watchNetworkPolicies() {
})
}

const (
NetworkPolicyAnnotation = "net.beta.kubernetes.io/network-policy"
)

type IngressIsolationPolicy string

const (
DefaultDeny IngressIsolationPolicy = "DefaultDeny"
)

type NamespaceNetworkPolicy struct {
Ingress *NamespaceIngressPolicy `json:"ingress,omitempty"`
}

type NamespaceIngressPolicy struct {
Isolation *IngressIsolationPolicy `json:"isolation,omitempty"`
}

func namespaceIsIsolated(ns *kapi.Namespace) bool {
annotation, exists := ns.Annotations[NetworkPolicyAnnotation]
if !exists {
return false
}
var policy NamespaceNetworkPolicy
if err := json.Unmarshal([]byte(annotation), &policy); err != nil {
glog.Warningf("Namespace %q has unparsable %q annotation %q", ns.Name, NetworkPolicyAnnotation, annotation)
return false
} else if policy.Ingress != nil && *policy.Ingress.Isolation == DefaultDeny {
return true
} else {
return false
}
}

func (np *networkPolicyPlugin) watchPods() {
RegisterSharedInformerEventHandlers(np.kubeInformers,
np.handleAddOrUpdatePod, np.handleDeletePod, Pods)
Expand Down Expand Up @@ -510,36 +496,22 @@ func (np *networkPolicyPlugin) watchNamespaces() {
func (np *networkPolicyPlugin) handleAddOrUpdateNamespace(obj, _ interface{}, eventType watch.EventType) {
ns := obj.(*kapi.Namespace)
glog.V(5).Infof("Watch %s event for Namespace %q", eventType, ns.Name)
// Don't grab the lock yet since this may block
vnid, err := np.vnids.WaitAndGetVNID(ns.Name)
if err != nil {
glog.Error(err)
return
}

np.lock.Lock()
defer np.lock.Unlock()
np.kNamespaces[ns.Name] = *ns
if npns, exists := np.namespaces[vnid]; exists {
npns.isolated = namespaceIsIsolated(ns)
np.syncNamespace(npns)
}
// else the NetNamespace doesn't exist yet, but we will initialize
// npns.isolated from the kapi.Namespace when it's created

np.kNamespaces[ns.Name] = *ns
np.refreshNetworkPolicies(Namespaces)
}

func (np *networkPolicyPlugin) handleDeleteNamespace(obj interface{}) {
ns := obj.(*kapi.Namespace)
glog.V(5).Infof("Watch %s event for Namespace %q", watch.Deleted, ns.Name)

np.lock.Lock()
defer np.lock.Unlock()
delete(np.kNamespaces, ns.Name)

// We don't need to np.syncNamespace() because if the NetNamespace
// still existed, it will be deleted as part of deleting the Namespace.

delete(np.kNamespaces, ns.Name)
np.refreshNetworkPolicies(Namespaces)
}

Expand Down

0 comments on commit 8be5204

Please sign in to comment.