Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug 1963846: Fix NPs for OVN LBs with hairpin traffic #518

Merged
merged 1 commit into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions kuryr_kubernetes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
K8S_API_CRD_KURYRPORTS = K8S_API_CRD + '/kuryrports'
K8S_API_POLICIES = '/apis/networking.k8s.io/v1/networkpolicies'
K8S_API_NETWORKING = '/apis/networking.k8s.io/v1'
K8S_API_NETWORKING_NAMESPACES = K8S_API_NETWORKING + '/namespaces'

K8S_API_NPWG_CRD = '/apis/k8s.cni.cncf.io/v1'

Expand Down
62 changes: 56 additions & 6 deletions kuryr_kubernetes/controller/drivers/network_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class NetworkPolicyDriver(base.NetworkPolicyDriver):
"""Provide security groups actions based on K8s Network Policies"""

def __init__(self):
super().__init__()
self.os_net = clients.get_network_client()
self.kubernetes = clients.get_kubernetes_client()

Expand Down Expand Up @@ -131,9 +132,52 @@ def _get_security_group_rules_from_network_policy(self, policy):
i_rules, e_rules = self.parse_network_policy_rules(policy)
# Add default rules to allow traffic from host and svc subnet
i_rules += self._get_default_np_rules()
# Add rules allowing ingress from LBs
# FIXME(dulek): Rules added below cannot work around the Amphora
# source-ip problem as Amphora does not use LB VIP for
# LB->members traffic, but that other IP attached to the
# Amphora VM in the service subnet. It's ridiculous.
i_rules += self._get_service_ingress_rules(policy)

return i_rules, e_rules

def _get_service_ingress_rules(self, policy):
"""Get SG rules allowing traffic from Services in the namespace

This methods returns ingress rules allowing traffic from all
services clusterIPs in the cluster. This is required for OVN LBs in
order to work around the fact that it changes source-ip to LB IP in
hairpin traffic. This shouldn't be a security problem as this can only
happen when the pod receiving the traffic is the one that calls the
service.

FIXME(dulek): Once OVN supports selecting a single, configurable
source-IP for hairpin traffic, consider using it instead.
"""
if CONF.octavia_defaults.enforce_sg_rules:
# When enforce_sg_rules is True, one of the default rules will
# open ingress from all the services subnets, so those rules would
# be redundant.
return []

ns = policy['metadata']['namespace']
rules = []
services = self.kubernetes.get(
f'{constants.K8S_API_NAMESPACES}/{ns}/services').get('items', [])
for svc in services:
if svc['metadata'].get('deletionTimestamp'):
# Ignore services being deleted
continue
ip = svc['spec'].get('clusterIP')
if not ip or ip == 'None':
# Ignore headless services
continue
rules.append(driver_utils.create_security_group_rule_body(
'ingress', cidr=ip,
description=f"Allow traffic from local namespace service "
f"{svc['metadata']['name']}"))
return rules

def _get_default_np_rules(self):
"""Add extra SG rule to allow traffic from svcs and host.

Expand Down Expand Up @@ -557,6 +601,8 @@ def _parse_sg_rules(self, sg_rule_body_list, direction, policy):

def _create_svc_egress_sg_rule(self, policy_namespace, sg_rule_body_list,
resource=None, port=None, protocol=None):
# FIXME(dulek): We could probably filter by namespace here for pods
# and namespace resources?
services = driver_utils.get_services()
if not resource:
svc_subnet = utils.get_subnet_cidr(
Expand All @@ -568,6 +614,15 @@ def _create_svc_egress_sg_rule(self, policy_namespace, sg_rule_body_list,
return

for service in services.get('items'):
if service['metadata'].get('deletionTimestamp'):
# Ignore services being deleted
continue

cluster_ip = service['spec'].get('clusterIP')
if not cluster_ip or cluster_ip == 'None':
# Headless services has 'None' as clusterIP.
continue

if self._is_pod(resource):
pod_labels = resource['metadata'].get('labels')
svc_selector = service['spec'].get('selector')
Expand All @@ -592,13 +647,8 @@ def _create_svc_egress_sg_rule(self, policy_namespace, sg_rule_body_list,
ns_name = service['metadata']['namespace']
if ns_name != resource['metadata']['name']:
continue
cluster_ip = service['spec'].get('clusterIP')
if not cluster_ip or cluster_ip == 'None':
# Headless services has 'None' as clusterIP.
continue
rule = driver_utils.create_security_group_rule_body(
'egress', port, protocol=protocol,
cidr=cluster_ip)
'egress', port, protocol=protocol, cidr=cluster_ip)
if rule not in sg_rule_body_list:
sg_rule_body_list.append(rule)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import uuid

from oslo_config import cfg
from oslo_log import log as logging

Expand Down Expand Up @@ -44,20 +42,6 @@ def _get_namespace_labels(namespace):
return namespaces['metadata'].get('labels')


def _bump_networkpolicy(knp):
kubernetes = clients.get_kubernetes_client()

try:
kubernetes.annotate(
knp['metadata']['annotations']['networkPolicyLink'],
{constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())})
except exceptions.K8sResourceNotFound:
raise
except exceptions.K8sClientException:
LOG.exception("Kubernetes Client Exception")
raise


def _create_sg_rules_with_container_ports(container_ports, matched):
"""Checks if security group rules based on container ports will be updated

Expand Down Expand Up @@ -321,7 +305,7 @@ def create_sg_rules(self, pod):

if i_matched or e_matched:
try:
_bump_networkpolicy(crd)
driver_utils.bump_networkpolicy(crd)
except exceptions.K8sResourceNotFound:
# The NP got deleted, ignore it.
continue
Expand Down Expand Up @@ -350,7 +334,7 @@ def delete_sg_rules(self, pod):

if i_matched or e_matched:
try:
_bump_networkpolicy(crd)
driver_utils.bump_networkpolicy(crd)
except exceptions.K8sResourceNotFound:
# The NP got deleted, ignore it.
continue
Expand Down Expand Up @@ -384,7 +368,7 @@ def delete_namespace_sg_rules(self, namespace):

if i_matched or e_matched:
try:
_bump_networkpolicy(crd)
driver_utils.bump_networkpolicy(crd)
except exceptions.K8sResourceNotFound:
# The NP got deleted, ignore it.
continue
Expand All @@ -407,7 +391,7 @@ def create_namespace_sg_rules(self, namespace):

if i_matched or e_matched:
try:
_bump_networkpolicy(crd)
driver_utils.bump_networkpolicy(crd)
except exceptions.K8sResourceNotFound:
# The NP got deleted, ignore it.
continue
Expand Down
40 changes: 39 additions & 1 deletion kuryr_kubernetes/controller/drivers/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# under the License.

import urllib
import uuid

import netaddr
from openstack import exceptions as os_exc
Expand Down Expand Up @@ -336,7 +337,7 @@ def get_networkpolicies(namespace=None):
try:
if namespace:
np_path = '{}/{}/networkpolicies'.format(
constants.K8S_API_CRD_NAMESPACES, namespace)
constants.K8S_API_NETWORKING_NAMESPACES, namespace)
else:
np_path = constants.K8S_API_POLICIES
nps = kubernetes.get(np_path)
Expand Down Expand Up @@ -592,3 +593,40 @@ def get_port_annot_pci_info(nodename, neutron_port):
LOG.exception('Exception when reading annotations '
'%s and converting from json', annot_name)
return pci_info


def bump_networkpolicy(knp):
kubernetes = clients.get_kubernetes_client()

try:
kubernetes.annotate(
knp['metadata']['annotations']['networkPolicyLink'],
{constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())})
except k_exc.K8sResourceNotFound:
raise
except k_exc.K8sClientException:
LOG.exception("Failed to annotate network policy %s to force its "
"recalculation.", utils.get_res_unique_name(knp))
raise


def bump_networkpolicies(namespace=None):
k8s = clients.get_kubernetes_client()
nps = get_networkpolicies(namespace)
for np in nps:
try:
k8s.annotate(np['metadata']['selfLink'],
{constants.K8S_ANNOTATION_POLICY: str(uuid.uuid4())})
except k_exc.K8sResourceNotFound:
# Ignore if NP got deleted.
pass
except k_exc.K8sClientException:
LOG.warning("Failed to annotate network policy %s to force its "
"recalculation.", utils.get_res_unique_name(np))
continue


def is_network_policy_enabled():
enabled_handlers = CONF.kubernetes.enabled_handlers
svc_sg_driver = CONF.kubernetes.service_security_groups_driver
return 'policy' in enabled_handlers and svc_sg_driver == 'policy'
11 changes: 3 additions & 8 deletions kuryr_kubernetes/controller/handlers/kuryrnetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self):
self._drv_vif_pool = drivers.VIFPoolDriver.get_instance(
specific_driver='multi_pool')
self._drv_vif_pool.set_vif_driver()
if self._is_network_policy_enabled():
if driver_utils.is_network_policy_enabled():
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
self._drv_svc_sg = (
drivers.ServiceSecurityGroupsDriver.get_instance())
Expand Down Expand Up @@ -81,7 +81,7 @@ def on_present(self, kuryrnet_crd):
# update SG and svc SGs
namespace = driver_utils.get_namespace(ns_name)
crd_selectors = self._drv_sg.update_namespace_sg_rules(namespace)
if (self._is_network_policy_enabled() and crd_selectors and
if (driver_utils.is_network_policy_enabled() and crd_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
services = driver_utils.get_services()
self._update_services(services, crd_selectors, project_id)
Expand Down Expand Up @@ -110,7 +110,7 @@ def on_finalize(self, kuryrnet_crd):
'metadata': {'name': kuryrnet_crd['spec']['nsName']}}
crd_selectors = self._drv_sg.delete_namespace_sg_rules(namespace)

if (self._is_network_policy_enabled() and crd_selectors and
if (driver_utils.is_network_policy_enabled() and crd_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
project_id = kuryrnet_crd['spec']['projectId']
services = driver_utils.get_services()
Expand All @@ -126,11 +126,6 @@ def on_finalize(self, kuryrnet_crd):
kuryrnet_crd)
raise

def _is_network_policy_enabled(self):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')

def _update_services(self, services, crd_selectors, project_id):
for service in services.get('items'):
if not driver_utils.service_matches_affected_pods(
Expand Down
4 changes: 3 additions & 1 deletion kuryr_kubernetes/controller/handlers/kuryrnetworkpolicy.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ def on_present(self, knp):
# by the policy
# FIXME(dulek): Make sure to include svcs without selector when
# we start supporting them.
if (not service['spec'].get('selector') or not
# NOTE(dulek): Skip services being deleted.
if (not service['spec'].get('selector') or
service['metadata'].get('deletionTimestamp') or not
self._is_service_affected(service, pods_to_update)):
continue
sgs = self._drv_svc_sg.get_security_groups(service, project_id)
Expand Down
12 changes: 4 additions & 8 deletions kuryr_kubernetes/controller/handlers/kuryrport.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self):
specific_driver='multi_pool')
self._drv_vif_pool.set_vif_driver()
self._drv_multi_vif = drivers.MultiVIFDriver.get_enabled_drivers()
if self._is_network_policy_enabled():
if driver_utils.is_network_policy_enabled():
self._drv_lbaas = drivers.LBaaSDriver.get_instance()
self._drv_svc_sg = (drivers.ServiceSecurityGroupsDriver
.get_instance())
Expand Down Expand Up @@ -122,7 +122,8 @@ def on_present(self, kuryrport_crd):
self._record_pod_creation_metric(pod)
except Exception:
LOG.debug("Failed to record metric for pod %s", name)
if self._is_network_policy_enabled():

if driver_utils.is_network_policy_enabled():
crd_pod_selectors = self._drv_sg.create_sg_rules(pod)
if oslo_cfg.CONF.octavia_defaults.enforce_sg_rules:
services = driver_utils.get_services()
Expand Down Expand Up @@ -193,7 +194,7 @@ def on_finalize(self, kuryrport_crd):
vif = objects.base.VersionedObject.obj_from_primitive(data['vif'])
self._drv_vif_pool.release_vif(pod, vif, project_id,
security_groups)
if (self._is_network_policy_enabled() and crd_pod_selectors and
if (driver_utils.is_network_policy_enabled() and crd_pod_selectors and
oslo_cfg.CONF.octavia_defaults.enforce_sg_rules):
services = driver_utils.get_services()
self._update_services(services, crd_pod_selectors, project_id)
Expand Down Expand Up @@ -283,11 +284,6 @@ def _update_kuryrport_crd(self, kuryrport_crd, vifs):
self.k8s.patch_crd('status', kuryrport_crd['metadata']['selfLink'],
{'vifs': vif_dict})

def _is_network_policy_enabled(self):
enabled_handlers = oslo_cfg.CONF.kubernetes.enabled_handlers
svc_sg_driver = oslo_cfg.CONF.kubernetes.service_security_groups_driver
return ('policy' in enabled_handlers and svc_sg_driver == 'policy')

def _update_services(self, services, crd_pod_selectors, project_id):
for service in services.get('items'):
if not driver_utils.service_matches_affected_pods(
Expand Down
11 changes: 11 additions & 0 deletions kuryr_kubernetes/controller/handlers/lbaas.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from kuryr_kubernetes import config
from kuryr_kubernetes import constants as k_const
from kuryr_kubernetes.controller.drivers import base as drv_base
from kuryr_kubernetes.controller.drivers import utils as driver_utils
from kuryr_kubernetes import exceptions as k_exc
from kuryr_kubernetes.handlers import k8s_base
from kuryr_kubernetes.objects import lbaas as obj_lbaas
Expand All @@ -47,6 +48,10 @@ def __init__(self):
self._drv_subnets = drv_base.ServiceSubnetsDriver.get_instance()
self._drv_sg = drv_base.ServiceSecurityGroupsDriver.get_instance()

def _bump_network_policies(self, svc):
if driver_utils.is_network_policy_enabled():
driver_utils.bump_networkpolicies(svc['metadata']['namespace'])

def on_present(self, service):
reason = self._should_ignore(service)
if reason:
Expand All @@ -64,6 +69,9 @@ def on_present(self, service):

if loadbalancer_crd is None:
try:
# Bump all the NPs in the namespace to force SG rules
# recalculation.
self._bump_network_policies(service)
self.create_crd_spec(service)
except k_exc.K8sNamespaceTerminating:
LOG.warning('Namespace %s is being terminated, ignoring '
Expand Down Expand Up @@ -111,6 +119,9 @@ def on_finalize(self, service):

klb_crd_path = (f"{k_const.K8S_API_CRD_NAMESPACES}/"
f"{svc_namespace}/kuryrloadbalancers/{svc_name}")
# Bump all the NPs in the namespace to force SG rules
# recalculation.
self._bump_network_policies(service)
try:
k8s.delete(klb_crd_path)
except k_exc.K8sResourceNotFound:
Expand Down