Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

overlay: Maintain a cache for k8s container port name. #87

Merged
merged 1 commit into from
Mar 1, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 122 additions & 19 deletions ovn_k8s/modes/overlay.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class OvnNB(object):
def __init__(self):
self.service_cache = {}
self.logical_switch_cache = {}
# Maintains mapping between k8s container L4 port name and its number.
self.port_name_cache = {}

def _update_service_cache(self, event_type, cache_key, service_data):
# Remove item from cache if it was deleted.
Expand All @@ -36,8 +38,8 @@ def _update_service_cache(self, event_type, cache_key, service_data):
# Update cache
self.service_cache[cache_key] = service_data

def _create_load_balancer_vip(self, load_balancer, service_ip, ips, port,
target_port, protocol):
def _create_load_balancer_vip(self, namespace, load_balancer, service_ip,
ips, port, target_port, protocol):
# With service_ip:port as a VIP, create an entry in 'load_balancer'

vlog.dbg("received event to create/modify load_balancer (%s) vip "
Expand All @@ -57,9 +59,36 @@ def _create_load_balancer_vip(self, load_balancer, service_ip, ips, port,
vlog.err("_create_load_balancer_vip remove: (%s)" % (str(e)))
return

# target is of the form "IP1:port, IP2:port, IP3:port"
target_endpoints = ",".join(["%s:%s" % (ip, target_port)
for ip in ips])
if target_port.isdigit():
# target is of the form "IP1:port, IP2:port, IP3:port"
target_endpoints = ",".join(["%s:%s" % (ip, target_port)
for ip in ips])
else:
# 'target_port' is a string. We should get its number
# from the cache.
if not self.port_name_cache.get(namespace):
vlog.warn("targetPort of %s in ns %s does not have an "
"associated port. Ignoring endpoint creation."
% (target_port, namespace))
return
target_endpoint_list = []
for ip in ips:
if not self.port_name_cache[namespace].get(ip):
continue

num_port = self.port_name_cache[namespace][ip].get(target_port)
if not num_port:
continue

target_endpoint_list.append("%s:%s" % (ip, num_port))

if not target_endpoint_list:
vlog.warn("targetPort of %s in ns %s does not have any "
"associated ports. Ignoring endpoint creation."
% (target_port, namespace))
return
target_endpoints = ",".join(target_endpoint_list)

target = "\"" + target_endpoints + "\""

try:
Expand Down Expand Up @@ -99,7 +128,8 @@ def _get_ovn_gateways(self):

return physical_gateways

def _create_gateways_vip(self, ips, port, target_port, protocol):
def _create_gateways_vip(self, namespace, ips, port, target_port,
protocol):
# Each gateway has a separate load-balancer for N/S traffic

physical_gateways = self._get_ovn_gateways()
Expand Down Expand Up @@ -142,11 +172,12 @@ def _create_gateways_vip(self, ips, port, target_port, protocol):

# With the physical_ip:port as the VIP, add an entry in
# 'load_balancer'.
self._create_load_balancer_vip(load_balancer, physical_ip, ips,
self._create_load_balancer_vip(namespace, load_balancer,
physical_ip, ips,
port, target_port, protocol)

def _create_cluster_vip(self, service_ip, ips, port, target_port,
protocol):
def _create_cluster_vip(self, namespace, service_ip, ips, port,
target_port, protocol):
# Add a VIP in the cluster load-balancer.

if protocol == "TCP":
Expand All @@ -157,10 +188,10 @@ def _create_cluster_vip(self, service_ip, ips, port, target_port,
return

# With service_ip:port as the VIP, add an entry in 'load_balancer'
self._create_load_balancer_vip(load_balancer, service_ip, ips, port,
target_port, protocol)
self._create_load_balancer_vip(namespace, load_balancer, service_ip,
ips, port, target_port, protocol)

def _create_external_vip(self, external_ip, ips,
def _create_external_vip(self, namespace, external_ip, ips,
port, target_port, protocol):
# With external_ip:port as the VIP, create an entry in a gateway
# load-balancer.
Expand Down Expand Up @@ -188,8 +219,8 @@ def _create_external_vip(self, external_ip, ips,
% (physical_gateway))

# With external_ip:port as VIP, add an entry in 'load_balancer'.
self._create_load_balancer_vip(load_balancer, external_ip, ips,
port, target_port, protocol)
self._create_load_balancer_vip(namespace, load_balancer, external_ip,
ips, port, target_port, protocol)

def _get_switch_gateway_ip(self, logical_switch):
cached_logical_switch = self.logical_switch_cache.get(logical_switch,
Expand Down Expand Up @@ -219,6 +250,44 @@ def _get_switch_gateway_ip(self, logical_switch):
gateway_ip_mask}
return (gateway_ip, mask)

def _add_k8s_l4_port_name_cache(self, data, ip_address):
# We maintain a port_name_cache because when k8s services are created
# with port names as targets, we need the corresponding L4 port
# numbers.
namespace = data['metadata']['namespace']
containers = data['spec'].get('containers')
if not containers:
return

# Example content of 'port_name_cache' is
# {'default': {'192.168.1.2': {'is-http': '80', 'is-db': 90'}}}
if namespace not in self.port_name_cache:
self.port_name_cache[namespace] = {}

if ip_address not in self.port_name_cache[namespace]:
self.port_name_cache[namespace][ip_address] = {}

for container in containers:
ports = container.get('ports')
if not ports:
continue

for port in ports:
container_port = port.get("containerPort")
port_name = port.get("name")
if container_port and port_name:
self.port_name_cache[namespace][ip_address][port_name] = \
container_port

def _delete_k8s_l4_port_name_cache(self, data, ip_address):
namespace = data['metadata']['namespace']

if namespace not in self.port_name_cache:
return

if ip_address in self.port_name_cache[namespace]:
del self.port_name_cache[namespace][ip_address]

def create_logical_port(self, event):
data = event.metadata
logical_switch = data['spec']['nodeName']
Expand Down Expand Up @@ -280,6 +349,25 @@ def create_logical_port(self, event):

vlog.info("created logical port %s" % (logical_port))

self._add_k8s_l4_port_name_cache(data, ip_address)

def _get_ip_address_from_annotations(self, annotations):
if not annotations.get("ovn"):
return None

ovn_annotated_dict = ast.literal_eval(annotations['ovn'])
ip_address_mask = ovn_annotated_dict.get('ip_address')
if not ip_address_mask:
return None

ip_address = None
try:
(ip_address, mask) = ip_address_mask.split("/")
except Exception:
vlog.err("_get_ip_address_from_annotations: failed to get ip")

return ip_address

def delete_logical_port(self, event):
data = event.metadata
pod_name = data['metadata']['name']
Expand All @@ -290,6 +378,11 @@ def delete_logical_port(self, event):
"unable to delete logical port" % data)
return

annotations = data['metadata']['annotations']
ip_address = self._get_ip_address_from_annotations(annotations)
if ip_address:
self._delete_k8s_l4_port_name_cache(data, ip_address)

try:
ovn_nbctl("--if-exists", "lsp-del", logical_port)
except Exception:
Expand All @@ -300,6 +393,7 @@ def delete_logical_port(self, event):

def _update_vip(self, service_data, ips):
service_type = service_data['spec'].get('type')
namespace = service_data['metadata']['namespace']

service_ip = service_data['spec'].get('clusterIP')
if not service_ip:
Expand All @@ -321,24 +415,25 @@ def _update_vip(self, service_data, ips):
continue

protocol = service_port.get('protocol', 'TCP')
target_port = service_port.get('targetPort', port)
target_port = str(service_port.get('targetPort', port))

if service_type == "NodePort":
# Add the 'NodePort' to a load-balancer instantiated in
# gateways.
self._create_gateways_vip(ips, port, target_port, protocol)
self._create_gateways_vip(namespace, ips, port, target_port,
protocol)
elif service_type == "ClusterIP":
# Add the 'service_ip:port' as a VIP in the cluster
# load-balancer.
self._create_cluster_vip(service_ip, ips, port,
self._create_cluster_vip(namespace, service_ip, ips, port,
target_port, protocol)

if external_ips:
for external_ip in external_ips:
# Add 'external_ip:port' as a VIP in a gateway
# load-balancer.
self._create_external_vip(external_ip, ips, port,
target_port, protocol)
self._create_external_vip(namespace, external_ip, ips,
port, target_port, protocol)

def update_vip(self, event):
service_data = event.metadata
Expand Down Expand Up @@ -403,8 +498,16 @@ def sync_pods(self, pods):
pod_name = pod['metadata']['name']
namespace = pod['metadata']['namespace']
logical_port = "%s_%s" % (namespace, pod_name)
annotations = pod['metadata']['annotations']
expected_logical_ports.add(logical_port)

# We should sync the container port names as there are no
# guarantees that a endpoint creation event will become after
# all the pods creation events.
ip_address = self._get_ip_address_from_annotations(annotations)
if ip_address:
self._add_k8s_l4_port_name_cache(pod, ip_address)

try:
existing_logical_ports = ovn_nbctl(
"--data=bare", "--no-heading",
Expand Down