Skip to content

Commit

Permalink
Merge pull request #848 from nbartos/fip
Browse files Browse the repository at this point in the history
Support floating IPs via NAT.
  • Loading branch information
Neil Jerram authored and Neil Jerram committed Mar 15, 2016
2 parents 109a732 + e57c564 commit c86ccd2
Show file tree
Hide file tree
Showing 14 changed files with 544 additions and 57 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
@@ -1,5 +1,9 @@
# Changelog

## 1.4.0-pre1

- Add floating IP support (via 1:1 NAT) in Felix.

## 1.3.0

- Felix now parses the etcd snapshot in parallel with the event stream;
Expand Down
52 changes: 49 additions & 3 deletions calico/common.py
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-

# Copyright (c) 2014, 2015 Metaswitch Networks
# All Rights Reserved.
# Copyright (c) 2014, 2015 Metaswitch Networks. All Rights Reserved.
# Copyright (c) 2015 Cisco Systems. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
Expand Down Expand Up @@ -32,6 +32,7 @@
import netaddr
import netaddr.core
from netaddr.strategy import eui48
from calico.felix.futils import IPV4, IP_TYPE_TO_VERSION

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -291,6 +292,13 @@ class ValidationFailed(Exception):
pass


def nat_key(ip_type):
if ip_type in [IPV4, IP_TYPE_TO_VERSION[IPV4]]:
return "ipv4_nat"
else:
return "ipv6_nat"


def validate_endpoint(config, combined_id, endpoint):
"""
Ensures that the supplied endpoint is valid. Once this routine has returned
Expand Down Expand Up @@ -362,7 +370,7 @@ def validate_endpoint(config, combined_id, endpoint):
canonical_nws = []
nets_list = endpoint.get(nets, [])
if not isinstance(nets_list, list):
issues.append("%s should be a list" % nets)
issues.append("%s should be a list." % nets)
else:
for ip in nets_list:
if not validate_cidr(ip, version):
Expand All @@ -373,6 +381,44 @@ def validate_endpoint(config, combined_id, endpoint):
canonical_nws.append(canonicalise_cidr(ip, version))
endpoint[nets] = canonical_nws

n_key = nat_key(version)
nat_maps = endpoint.get(n_key, None)
if nat_maps is not None:
if isinstance(nat_maps, list):
canonical_nm = []
for nat_map in nat_maps:
canonical = {}
for t in "int", "ext":
canonical[t] = None
ip = nat_map.get("%s_ip" % t, None)
if ip:
if validate_ip_addr(ip, version):
canonical[t] = canonicalise_ip(ip, version)
else:
issues.append("%s_ip (%r) is not a valid IPv%d"
" address." % (t, ip, version))
else:
issues.append("%s_ip was not specified a %s entry."
% (t, n_key))
if canonical["int"] and canonical["ext"]:
canonical_nm.append({"int_ip": canonical["int"],
"ext_ip": canonical["ext"]})
endpoint[n_key] = canonical_nm

for nat_map in canonical_nm:
if version == 4:
nm = "/32"
else:
nm = "/128"
int_ip_nm = nat_map["int_ip"] + nm
# At this point these have all been canonicalized, so we
# should be able to do a strict string comparison.
if int_ip_nm not in endpoint[nets]:
issues.append("int_ip %s is not listed in %s." %
(int_ip_nm, nets))
else:
issues.append("%s should be a list." % n_key)

gw_key = "ipv%d_gateway" % version
try:
gw_str = endpoint[gw_key]
Expand Down
37 changes: 33 additions & 4 deletions calico/felix/endpoint.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2015 Metaswitch Networks
# All Rights Reserved.
# Copyright (c) 2015 Metaswitch Networks. All Rights Reserved.
# Copyright (c) 2015 Cisco Systems. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
Expand All @@ -20,6 +20,7 @@
Endpoint management.
"""
import logging
from calico.common import nat_key
from calico.datamodel_v1 import (
ENDPOINT_STATUS_UP, ENDPOINT_STATUS_DOWN, ENDPOINT_STATUS_ERROR
)
Expand All @@ -40,6 +41,7 @@ def __init__(self, config, ip_type,
iptables_updater,
dispatch_chains,
rules_manager,
fip_manager,
status_reporter):
super(EndpointManager, self).__init__(qualifier=ip_type)

Expand All @@ -53,6 +55,7 @@ def __init__(self, config, ip_type,
self.dispatch_chains = dispatch_chains
self.rules_mgr = rules_manager
self.status_reporter = status_reporter
self.fip_manager = fip_manager

# All endpoint dicts that are on this host.
self.endpoints_by_id = {}
Expand All @@ -75,6 +78,7 @@ def _create(self, combined_id):
self.iptables_updater,
self.dispatch_chains,
self.rules_mgr,
self.fip_manager,
self.status_reporter)

def _on_object_started(self, endpoint_id, obj):
Expand All @@ -89,7 +93,8 @@ def _on_object_started(self, endpoint_id, obj):
def on_datamodel_in_sync(self):
if not self._data_model_in_sync:
_log.info("%s: First time we've been in-sync with the datamodel,"
"sending snapshot to DispatchChains.", self)
"sending snapshot to DispatchChains and FIPManager.",
self)
self._data_model_in_sync = True

# Tell the dispatch chains about the local endpoints in advance so
Expand All @@ -102,6 +107,14 @@ def on_datamodel_in_sync(self):
local_ifaces = frozenset(self.endpoint_id_by_iface_name.keys())
self.dispatch_chains.apply_snapshot(local_ifaces, async=True)

nat_maps = {}
for ep_id, ep in self.endpoints_by_id.iteritems():
if ep_id in self.local_endpoint_ids:
nat_map = ep.get(nat_key(self.ip_type), None)
if nat_map:
nat_maps[ep_id] = nat_map
self.fip_manager.apply_snapshot(nat_maps, async=True)

@actor_message()
def on_endpoint_update(self, endpoint_id, endpoint, force_reprogram=False):
"""
Expand Down Expand Up @@ -169,7 +182,7 @@ def on_interface_update(self, name, iface_up):
class LocalEndpoint(RefCountedActor):

def __init__(self, config, combined_id, ip_type, iptables_updater,
dispatch_chains, rules_manager, status_reporter):
dispatch_chains, rules_manager, fip_manager, status_reporter):
"""
Controls a single local endpoint.
Expand All @@ -178,6 +191,7 @@ def __init__(self, config, combined_id, ip_type, iptables_updater,
:param iptables_updater: IptablesUpdater to use
:param dispatch_chains: DispatchChains to use
:param rules_manager: RulesManager to use
:param fip_manager: FloatingIPManager to use
"""
super(LocalEndpoint, self).__init__(qualifier="%s(%s)" %
(combined_id.endpoint, ip_type))
Expand All @@ -195,6 +209,7 @@ def __init__(self, config, combined_id, ip_type, iptables_updater,
self.dispatch_chains = dispatch_chains
self.rules_mgr = rules_manager
self.status_reporter = status_reporter
self.fip_manager = fip_manager

# Helper for acquiring/releasing profiles.
self.rules_ref_helper = RefHelper(self, rules_manager,
Expand Down Expand Up @@ -449,6 +464,12 @@ def _apply_endpoint_update(self):
# table.
_log.debug("IP addresses changed, need to update routing")
self._device_in_sync = False
for key in "ipv4_nat", "ipv6_nat":
if (self.endpoint.get(key, None) !=
pending_endpoint.get(key, None)):
_log.debug("NAT mappings have changed, refreshing.")
self._device_in_sync = False
self._iptables_in_sync = False
else:
# Delete of the endpoint. Need to resync everything.
profile_ids = set()
Expand All @@ -472,12 +493,16 @@ def _update_chains(self):
self.endpoint["profile_ids"])
try:
self.iptables_updater.rewrite_chains(updates, deps, async=False)
self.fip_manager.update_endpoint(self.combined_id,
self.endpoint.get(nat_key(self.ip_type), None), async=True)
except FailedSystemCall:
_log.exception("Failed to program chains for %s. Removing.", self)
try:
self.iptables_updater.delete_chains(
self.iptables_generator.endpoint_chain_names(self._suffix),
async=False)
self.fip_manager.update_endpoint(self.combined_id, None,
async=True)
except FailedSystemCall:
_log.exception("Failed to remove chains after original "
"failure")
Expand All @@ -490,6 +515,8 @@ def _remove_chains(self):
self.iptables_updater.delete_chains(
self.iptables_generator.endpoint_chain_names(self._suffix),
async=False)
self.fip_manager.update_endpoint(self.combined_id, None,
async=True)
except FailedSystemCall:
_log.exception("Failed to delete chains for %s", self)
else:
Expand All @@ -516,6 +543,8 @@ def _configure_interface(self):
ips = set()
for ip in self.endpoint.get(self.nets_key, []):
ips.add(futils.net_to_ip(ip))
for nat_map in self.endpoint.get(nat_key(self.ip_type), []):
ips.add(nat_map['ext_ip'])
devices.set_routes(self.ip_type, ips,
self._iface_name,
self.endpoint["mac"],
Expand Down
17 changes: 15 additions & 2 deletions calico/felix/felix.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright (c) Metaswitch Networks 2015. All rights reserved.
# Copyright (c) 2015 Cisco Systems. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
Expand Down Expand Up @@ -45,6 +46,7 @@
from calico.felix.endpoint import EndpointManager
from calico.felix.ipsets import IpsetManager, IpsetActor, HOSTS_IPSET_V4
from calico.felix.masq import MasqueradeManager
from calico.felix.fipmanager import FloatingIPManager
from calico.felix.fetcd import EtcdAPI

_log = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,27 +84,32 @@ def _main_greenlet(config):
v4_filter_updater,
v4_ipset_mgr)
v4_dispatch_chains = DispatchChains(config, 4, v4_filter_updater)
v4_fip_manager = FloatingIPManager(config, 4, v4_nat_updater)
v4_ep_manager = EndpointManager(config,
IPV4,
v4_filter_updater,
v4_dispatch_chains,
v4_rules_manager,
v4_fip_manager,
etcd_api.status_reporter)

v6_raw_updater = IptablesUpdater("raw", ip_version=6, config=config)
v6_filter_updater = IptablesUpdater("filter", ip_version=6,
config=config)
v6_nat_updater = IptablesUpdater("nat", ip_version=6, config=config)
v6_ipset_mgr = IpsetManager(IPV6, config)
v6_rules_manager = RulesManager(config,
6,
v6_filter_updater,
v6_ipset_mgr)
v6_dispatch_chains = DispatchChains(config, 6, v6_filter_updater)
v6_fip_manager = FloatingIPManager(config, 6, v6_nat_updater)
v6_ep_manager = EndpointManager(config,
IPV6,
v6_filter_updater,
v6_dispatch_chains,
v6_rules_manager,
v6_fip_manager,
etcd_api.status_reporter)
cleanup_mgr = CleanupManager(
config,
Expand All @@ -116,6 +123,7 @@ def _main_greenlet(config):
v4_ep_manager, v6_ep_manager,
v6_raw_updater,
v4_masq_manager,
v4_nat_updater, v6_nat_updater,
cleanup_mgr,
]
)
Expand All @@ -132,35 +140,40 @@ def _main_greenlet(config):
v4_rules_manager.start()
v4_dispatch_chains.start()
v4_ep_manager.start()
v4_fip_manager.start()

v6_raw_updater.start()
v6_filter_updater.start()
v6_nat_updater.start()
v6_ipset_mgr.start()
v6_rules_manager.start()
v6_dispatch_chains.start()
v6_ep_manager.start()
v6_fip_manager.start()

iface_watcher.start()

top_level_actors = [
hosts_ipset_v4,
cleanup_mgr,

v4_nat_updater,
v4_filter_updater,
v4_nat_updater,
v4_ipset_mgr,
v4_masq_manager,
v4_rules_manager,
v4_dispatch_chains,
v4_ep_manager,
v4_fip_manager,

v6_raw_updater,
v6_filter_updater,
v6_nat_updater,
v6_ipset_mgr,
v6_rules_manager,
v6_dispatch_chains,
v6_ep_manager,
v6_fip_manager,

iface_watcher,
etcd_api,
Expand All @@ -171,7 +184,7 @@ def _main_greenlet(config):
# Install the global rules before we start polling for updates.
_log.info("Installing global rules.")
install_global_rules(config, v4_filter_updater, v6_filter_updater,
v4_nat_updater, v6_raw_updater)
v4_nat_updater, v6_nat_updater, v6_raw_updater)

# Start polling for updates. These kicks make the actors poll
# indefinitely.
Expand Down

0 comments on commit c86ccd2

Please sign in to comment.