Skip to content

Commit

Permalink
Process network policies for pods
Browse files Browse the repository at this point in the history
Add logic for processing network policy objects.
This includes:
- ensuring traffic is whitelisted for pods in non-isolated namespaces
- ensuring a default 'drop' ACL is added for each pod in an isolated
  namespace
- translating kubernetes network policies into a pseudo-acls
  representation of OVN match rules
- Determining which policies apply to a pod and translating pseud-acls
  into actual ACLs upon pod creation
- Removing all ACLs for a pod upon pod deletion
- Creating and maintaining OVN address sets for the IP addresses of pods
  that match the from clause of network policies rules
- monitoring transitions in the namespace isolation property and reacting
  accordingly

Also, the pod watcher will keep track of pod-IP mappings. As the pod IP
address is removed before the pod DELETED events, when this event occurs
neither pod data nor their cached version will contain a pod IP. The pod
IP is required to update the address set for the network policy upon pod
deletion.

As a part of this patch, the signature for the create_logical_port and
delete_logical_port methods in the ovn_k8s.modes.overlay.OvnNB class
has been changed to accept pod data (and pod ip for the delete method)
rather than an event.

Signed-off-by: Salvatore Orlando <salv.orlando@gmail.com>
  • Loading branch information
salv-orlando committed Dec 30, 2016
1 parent 1f929ae commit 0cf76dd
Show file tree
Hide file tree
Showing 4 changed files with 194 additions and 26 deletions.
16 changes: 6 additions & 10 deletions ovn_k8s/modes/overlay.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ def _get_switch_gateway_ip(self, logical_switch):
def _build_logical_port_name(self, namespace, pod_name):
return "%s_%s" % (namespace, pod_name)

def create_logical_port(self, event, create_network_policies=False):
data = event.metadata
def create_logical_port(self, data, create_network_policies=False):
logical_switch = data['spec']['nodeName']
pod_name = data['metadata']['name']
namespace = data['metadata']['namespace']
Expand Down Expand Up @@ -218,8 +217,7 @@ def create_logical_port(self, event, create_network_policies=False):

vlog.info("created logical port %s" % (logical_port))

def delete_logical_port(self, event, delete_network_policies):
data = event.metadata
def delete_logical_port(self, data, pod_ip, delete_network_policies):
pod_id = data['metadata']['uid']
pod_name = data['metadata']['name']
namespace = data['metadata']['namespace']
Expand All @@ -243,7 +241,7 @@ def delete_logical_port(self, event, delete_network_policies):
vlog.dbg("Pod: %s (Namespace: %s) - ACLs for pod removed "
% (pod_name, namespace))
# Remove references to pod from address sets
self.delete_pod_from_address_sets(data)
self.delete_pod_from_address_sets(data, pod_ip)
vlog.dbg("Pod: %s (Namespace: %s) - Pod IP removed from address "
"sets" % (pod_name, namespace))

Expand Down Expand Up @@ -816,16 +814,14 @@ def add_pods_to_policy_address_sets(self, policy_data):
if self._pod_matches_from_clause(pod_data, ns_data, rule):
self.add_to_address_set(pod_ip, policy_ns, policy_data)

def delete_pod_from_address_sets(self, pod_data):
def delete_pod_from_address_sets(self, pod_data, pod_ip=None):
namespace = pod_data['metadata']['namespace']
policies = kubernetes.get_network_policies(
variables.K8S_API_SERVER,
namespace)
# NOTE: Removing the pod IP from every address set is not harmful but
# cna be optimized by removing it only from the address sets that
# match the policy rule
pod_ip = pod_ip or pod_data['status']['podIP']
for policy_data in policies:
self.remove_from_address_set(
pod_data['status']['podIP'],
namespace,
policy_data)
self.remove_from_address_set(pod_ip, namespace, policy_data)
11 changes: 7 additions & 4 deletions ovn_k8s/processor/conn_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
class ConnectivityProcessor(ovn_k8s.processor.BaseProcessor):

def _process_pod_event(self, event):
vlog.dbg("Received a pod %s event %s" % (event.event_type,
event.metadata))
pod_data, pod_ip = event.metadata
if event.event_type == "DELETED":
vlog.dbg("Received a pod delete event %s" % (event.metadata))
self.mode.delete_logical_port(event)
self.mode.delete_logical_port(pod_data, pod_ip,
variables.K8S_WATCH_POLICIES)
else:
vlog.dbg("Received a pod ADD/MODIFY event %s" % (event.metadata))
self.mode.create_logical_port(event, variables.K8S_WATCH_POLICIES)
self.mode.create_logical_port(pod_data,
variables.K8S_WATCH_POLICIES)

def _process_service_event(self, event):
if event.event_type == "DELETED":
Expand Down
180 changes: 170 additions & 10 deletions ovn_k8s/processor/policy_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

from ovs import vlog

from ovn_k8s.common import kubernetes
from ovn_k8s.common import variables
import ovn_k8s.processor as processor
import ovn_k8s.watcher.policy_watcher

Expand All @@ -28,34 +30,192 @@ def __init__(self, pool):
self._pool = pool
self._np_watcher_threads = {}

def _process_ns_event(self, event):
def _process_ns_event(self, event, affected_pods, pod_events):
log.dbg("Processing event %s from namespace %s" % (
event.event_type, event.source))
namespace = event.source
# TODO(salv-orlando): handle namespace isolation change events

def scan_pods():
ns_pods = kubernetes.get_pods_by_namespace(
variables.K8S_API_SERVER, event.source)
if not ns_pods:
return
for pod in ns_pods:
pod_id = pod['metadata']['uid']
affected_pods[pod_id] = pod
pod_events.setdefault(pod_id, []).append(event)

if event.event_type == 'ADDED':
log.dbg("Namespace %s added - spawning policy watcher" % namespace)
watcher_thread = pw.create_namespace_watcher(
namespace, self._pool)
self._np_watcher_threads[namespace] = watcher_thread
# Upon restart this event will be receive for existing pods.
# The namespace isolation status might have changed while the
# watcher was not running. Pods in the namespace need to be
# checked again
scan_pods()
elif event.event_type == 'DELETED':
watcher_thread = self._np_watcher_threads.pop(namespace)
pw.remove_namespace_watcher(watcher_thread)
elif event.event_type == 'MODIFIED':
# This a transition in the namespace isolation status. All the pods
# in the namespace are affected
scan_pods()

def _process_pod_event(self, event, affected_pods, pod_events):
log.dbg("Processing event %s from pod %s" % (
event.event_type, event.source))
pod_data = event.metadata
# Pods are affected only if the namespace is isolated
if not kubernetes.is_namespace_isolated(
variables.K8S_API_SERVER, pod_data['metadata']['namespace']):
log.dbg("Namespace %s for pod %s is not isolated, no further "
"processing required" % (pod_data['metadata']['namespace'],
event.source))
# ACLs for this pod must be recalculated as label changes might
# imply the set of policies that apply to the pod changes
pod_id = pod_data['metadata']['uid']
affected_pods[pod_id] = pod_data
pod_events.setdefault(pod_id, []).append(event)
# A pod label change might also imply changes to address sets
self.mode.delete_pod_from_address_sets(pod_data)
self.mode.add_pod_to_address_sets(pod_data)

def _process_np_event(self, event, affected_pods, pod_events):
log.dbg("Processing event %s from network policy %s" % (
event.event_type, event.source))
namespace = event.metadata['metadata']['namespace']
policy = event.source
policy_data = event.metadata
policy_id = policy_data['metadata']['uid']
if not kubernetes.is_namespace_isolated(variables.K8S_API_SERVER,
namespace):
log.warn("Policy %s applied to non-isolated namespace:%s."
"Skipping processing" % (policy, namespace))
return

# Retrieve pods matching policy pod selector. ACLs for this policy
# must be created (or destroyed) for these pods
pod_selector = policy_data.get('podSelector', {})
pods = kubernetes.get_pods_by_namespace(
variables.K8S_API_SERVER,
namespace=namespace,
pod_selector=pod_selector)
for pod in pods:
pod_id = pod['metadata']['uid']
affected_pods[pod_id] = pod
pod_events.setdefault(pod_id, []).append(event)

if event.event_type == 'DELETED':
# Remove pseudo ACLs for policy and address sets for all of
# its rules
self.mode.destroy_address_set(namespace, policy_data)
self.mode.remove_pseudo_acls(policy_id)
log.dbg("Policy: %s (Namespace: %s): pseudo ACLs and address sets"
"destroyed" % (policy_id, namespace))
return
else:
# As there is no MODIFIED event for policies, if we end up here
# we are hanndling an ADDED event
self.mode.create_address_set(namespace, policy_data)
self.mode.build_pseudo_acls(policy_data)
self.mode.add_pods_to_policy_address_sets(policy_data)
log.dbg("Policy: %s (Namespace: %s): pseudo ACLs and address sets"
"created" % (policy_id, namespace))

def _apply_pod_ns_acls(self, pod_data, pod_events):
ns_events = [event for event in pod_events if
isinstance(event, processor.NSEvent)]
if not ns_events:
return
namespace = pod_data['metadata']['namespace']
pod_name = pod_data['metadata']['name']
pod_id = pod_data['metadata']['uid']
log.dbg("Pod: %s (Namespace: %s): Applying ACL changes for "
"namespace events" % (pod_name, namespace))
# In some rare cases, there could multiple namespace events. The
# ns_events list reports then in the same order in which they were
# detected and is therefore reliable. We only check first and last as
# multiple transitions might nullify each other
if ns_events[0] == ns_events[-1]:
# Single event
ns_data = ns_events[-1].metadata
isolated_final = ns_data.get('custom', {}).get('isolated', False)
else:
ns_data_final = ns_events[-1].metadata
isolated_final = ns_data_final.get(
'custom', {}).get('isolated', False)
ns_data_initial = ns_events[0].metadata
isolated_initial = ns_data_initial.get(
'custom', {}).get('isolated', False)
if isolated_final == isolated_initial:
# Nothing actually changed
log.dbg("Pod: %s (Namespace: %s): initial and final "
"namespace isolation status are identical (%s) - "
"no processing necessary" %
(pod_name. namespace, isolated_final))
return
self.mode.remove_acls(pod_id)
if not isolated_final:
log.dbg("Pod: %s (Namespace: %s): not isolated, "
"whitelisting traffic for pod" % (pod_name, namespace))
self.mode.whitelist_pod_traffic(pod_id, namespace, pod_name)

def _apply_pod_acls(self, pod_data, pod_events, policies):
# Do shallow copy of pod events as the list will be modified
pod_events = pod_events[:]
pod_name = pod_data['metadata']['name']
pod_ns = pod_data['metadata']['namespace']

log.dbg("Pod: %s (Namespace: %s): applying ACLs..." % (
pod_name, pod_ns))
# Check for a namespace event. This might mean that there has
# been a translation from isolated to non-isolated and therefore
# all we have to do would be whitelisting traffic
self._apply_pod_ns_acls(pod_data, pod_events)
if kubernetes.is_namespace_isolated(
variables.K8S_API_SERVER, pod_ns):
# If we are here the namespace is isolated, and policies must
# be translated into acls
self.mode.apply_pod_policy_acls(pod_data, policies)
log.dbg("Pod: %s (Namespace: %s): ACLs applied" % (pod_name, pod_ns))

def process_events(self, events):
log.dbg("Processing %d events from queue" % len(events))
for event in events[:]:
affected_pods = {}
pod_events = {}
for event in events:
if isinstance(event, processor.NSEvent):
# namespace add -> create policy watcher
# namespace delete -> destory policy watcher
# namespace update -> check isolation property
self._process_ns_event(event)
events.remove(event)
self._process_ns_event(event, affected_pods, pod_events)
if isinstance(event, processor.NPEvent):
# policy add -> create ACLs for affected pods
# policy delete -> remove ACLs for affected pods
self._process_np_event(event, affected_pods, pod_events)
if isinstance(event, processor.PodEvent):
# relevant policies must be applied to pod
# check policies that select pod in from clause
self._process_pod_event(event, affected_pods, pod_events)

for event in events:
log.warn("Event %s from %s was not processed. ACLs might not be "
"in sync with network policies",
event.event_type, event.source)
else:
log.info("Event processing terminated.")
ns_policy_map = {}
for pod_id, pod_data in affected_pods.items():
pod_ns = pod_data['metadata']['namespace']
policies = ns_policy_map.get(
pod_ns, kubernetes.get_network_policies(
variables.K8S_API_SERVER,
pod_data['metadata']['namespace']))
ns_policy_map[pod_ns] = policies
log.dbg("Rebuilding ACL for pod:%s because of:%s" %
(pod_id, "; ".join(['%s from %s' % (event.event_type,
event.source)
for event in pod_events[pod_id]])))
self._apply_pod_acls(pod_data, pod_events[pod_id], policies)

log.info("Event processing terminated.")


def get_event_queue():
Expand Down
13 changes: 11 additions & 2 deletions ovn_k8s/watcher/pod_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ class PodWatcher(object):
def __init__(self, pod_stream):
self._pod_stream = pod_stream
self.pod_cache = {}
self.pod_ips = {}

def _send_connectivity_event(self, event_type, pod_name, pod_data):
# If available, add the pod IP to event metadata
pod_ip = self.pod_ips.get(pod_data['metadata']['uid'])
ev = ovn_k8s.processor.PodEvent(event_type,
source=pod_name,
metadata=pod_data)
metadata=(pod_data, pod_ip))
conn_processor.get_event_queue().put(ev)

def _send_policy_event(self, event_type, pod_name, pod_data):
Expand Down Expand Up @@ -68,9 +71,14 @@ def _process_pod_event(self, event):
if event_type != 'DELETED' and not pod_data['spec'].get('nodeName'):
return

# If a pod has an IP, save it
if pod_data['metadata']['uid'] not in self.pod_ips:
pod_ip = pod_data['status'].get('podIP')
if pod_ip:
self.pod_ips[pod_data['metadata']['uid']] = pod_ip

cache_key = "%s_%s" % (namespace, pod_name)
cached_pod = self.pod_cache.get(cache_key, {})
self._update_pod_cache(event_type, cache_key, pod_data)

has_conn_event = False
label_changes = False
Expand All @@ -83,6 +91,7 @@ def _process_pod_event(self, event):
pod_data['metadata'].get('labels', {}),
cached_pod['metadata'].get('labels', {}))

self._update_pod_cache(event_type, cache_key, pod_data)
if has_conn_event:
vlog.dbg("Sending connectivity event for event %s on pod %s"
% (event_type, pod_name))
Expand Down

0 comments on commit 0cf76dd

Please sign in to comment.