Skip to content

Commit

Permalink
Read podCidr from k8s API when using host-local
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport committed May 23, 2016
1 parent ac558a7 commit 829d032
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 17 deletions.
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ deploy-rkt: dist/calicoctl
ut: update-version
docker run --rm -v `pwd`:/code \
calico/test \
nosetests tests/unit -c nose.cfg
sh -c "pip install pykube && nosetests tests/unit -c nose.cfg"

# Run the fv tests.
fv: update-version
docker run --rm -v `pwd`:/code \
calico/test \
nosetests tests/fv -c nose.cfg
sh -c "pip install pykube && nosetests tests/fv -c nose.cfg"

# Makes tests on Circle CI.
test-circle: update-version dist/calico dist/calico-ipam
Expand All @@ -63,7 +63,8 @@ test-circle: update-version dist/calico dist/calico-ipam
-v $(CIRCLE_TEST_REPORTS):/circle_output \
-e COVERALLS_REPO_TOKEN=$(COVERALLS_REPO_TOKEN) \
calico/test sh -c \
'nosetests tests -c nose.cfg \
'pip install pykube && \
nosetests tests -c nose.cfg \
--with-xunit --xunit-file=/circle_output/output.xml; RC=$$?;\
[[ ! -z "$$COVERALLS_REPO_TOKEN" ]] && coveralls || true; exit $$RC'

Expand Down
81 changes: 81 additions & 0 deletions calico.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
ETCD_ENDPOINTS_ENV)
from pycalico.datastore_errors import MultipleEndpointsMatch

from pykube.config import KubeConfig
from pykube.http import HTTPClient
from pykube.objects import Node
from pykube.query import Query

from calico_cni import __version__, __commit__, __branch__
from calico_cni.util import (configure_logging, parse_cni_args, print_cni_error,
handle_datastore_error, CniError)
Expand Down Expand Up @@ -143,6 +148,9 @@ def __init__(self, network_config, env):
else:
self.workload_id = self.container_id
self.orchestrator_id = "cni"
kubernetes_config = network_config.get("kubernetes", {})
self.kubeconfig_path = kubernetes_config.get("kubeconfig")
self.k8s_node_name = kubernetes_config.get("node_name", socket.gethostname())
"""
Configure orchestrator specific settings.
workload_id: In Kubernetes, this is the pod's namespace and name.
Expand Down Expand Up @@ -441,14 +449,87 @@ def _call_ipam_plugin(self, env):
"msg": e.msg,
"details": e.details})
code = e.code
elif self.ipam_type == "host-local":
# We've been told to use the "host-local" IPAM plugin.
# Check if we need to use the Kubernetes podCidr for this node, and
# if so replace the subnet field with the correct value.
if self.network_config["ipam"].get("subnet") == "usePodCidr":
if not self.running_under_k8s:
print_cni_error(ERR_CODE_GENERIC, "Invalid network config",
"Must be running under Kubernetes to use \
'subnet: usePodCidr'")
_log.info("Using Kubernetes pod cidr")
pod_cidr = self._get_kubernetes_pod_cidr()
self.network_config["ipam"]["subnet"] = str(pod_cidr)

# Call the IPAM plugin.
_log.debug("Calling host-local IPAM plugin")
code, response = self._call_binary_ipam_plugin(env)
else:
# Using some other IPAM plugin - call it.
_log.debug("Using binary plugin")
code, response = self._call_binary_ipam_plugin(env)

# Return the IPAM return code and output.
_log.debug("IPAM response (rc=%s): %s", code, response)
return code, response

def _get_kubernetes_pod_cidr(self):
"""
Attempt to get the Kubernetes pod CIDR for this node.
First check if we've written it to disk. If so, use that value. If
not, then query the Kubernetes API for it.
"""
local_store_path = os.path.abspath("podCidr.cache")
if os.path.exists(local_store_path):
_log.info("Loading podCidr from disk: %s", local_store_path)
with open(local_store_path, "r") as f:
pod_cidr = f.read().strip()
_log.debug("Loaded podCidr from disk: %s", pod_cidr)
else:
_log.info("Getting node.spec.podCidr from API, kubeconfig: %s",
self.kubeconfig_path)
if not self.kubeconfig_path:
# Fow now, kubeconfig is the only supported auth method.
print_cni_error(ERR_CODE_GENERIC, "Missing kubeconfig",
"usePodCidr requires specification of kubeconfig file")
sys.exit(ERR_CODE_GENERIC)

# Query the API for this node. Default node name to the hostname.
try:
api = HTTPClient(KubeConfig.from_file(self.kubeconfig_path))
node = None
for n in Node.objects(api):
if n.obj["metadata"]["name"] == self.k8s_node_name:
_log.debug("Checking node: %s", n.obj["metadata"]["name"])
node = n
break
if not node:
raise KeyError("Unable to find node in API: %s", self.k8s_node_name)
_log.debug("Found node %s: %s: ", node.obj["metadata"]["name"],
node.obj["spec"])
except Exception:
print_cni_error(ERR_CODE_GENERIC, "Error querying Kubernetes API",
"Failed to get podCidr from Kubernetes API")
sys.exit(ERR_CODE_GENERIC)
else:
pod_cidr = node.obj["spec"].get("podCIDR")
if not pod_cidr:
print_cni_error(ERR_CODE_GENERIC, "Missing podCidr",
"No podCidr for node %s" % self.k8s_node_name)
sys.exit(ERR_CODE_GENERIC)
else:
# Success - write to disk so we don't have to
# always query the API for this.
with open(local_store_path, "w") as f:
_log.info("Caching podCidr on disk: %s", local_store_path)
try:
f.write(pod_cidr)
except Exception:
_log.warning("Failed to write %s", local_store_path)
_log.debug("Using podCidr: %s", pod_cidr)
return pod_cidr

def _call_binary_ipam_plugin(self, env):
"""Calls through to the specified IPAM plugin binary.
Expand Down
7 changes: 5 additions & 2 deletions calico_cni/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import re
import socket

# The hostname of the current node.
HOSTNAME = socket.gethostname()

# Regex to parse CNI_ARGS. Looks for key value pairs separated by an equals
# sign and followed either the end of the string, or a colon (indicating
# sign and followed either the end of the string, or a colon (indicating
# that there is another CNI_ARG key/value pair.
CNI_ARGS_RE = re.compile("([a-zA-Z0-9/\.\-\_ ]+)=([a-zA-Z0-9/\.\-\_ ]+)(?:;|$)")

Expand Down Expand Up @@ -54,7 +57,7 @@
ASSIGN_IPV4_KEY = "assign_ipv4"
ASSIGN_IPV6_KEY = "assign_ipv6"

# Constants for getting policy specific information
# Constants for getting policy specific information
# from the policy dictionary in the network config file.
API_ROOT_KEY = "k8s_api_root"
AUTH_TOKEN_KEY = "k8s_auth_token"
Expand Down
7 changes: 4 additions & 3 deletions calico_cni/policy_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,15 @@ def generate_tags(self):
def _get_api_pod(self):
"""Get the pod resource from the API.
:return: JSON object containing the pod spec
:return: Dictionary representation of Pod from k8s API.
"""
# If kubeconfig was specified, use the pykube library.
if self.kubeconfig_path:
_log.info("Using kubeconfig at %s", self.kubeconfig_path)
try:
api = HTTPClient(KubeConfig.from_file(self.kubeconfig_path))
pod = Query(api, Pod, self.namespace).get_by_name(self.pod_name)
_log.info("Found pod: %s: ", pod.obj)
_log.debug("Found pod: %s: ", pod.obj)
except Exception as e:
raise PolicyException("Error querying Kubernetes API",
details=str(e.message))
Expand Down Expand Up @@ -400,6 +400,7 @@ def get_policy_driver(cni_plugin):
policy_config = cni_plugin.network_config.get(POLICY_KEY, {})
network_name = cni_plugin.network_config["name"]
policy_type = policy_config.get("type")
k8s_config = cni_plugin.network_config.get("kubernetes", {})
supported_policy_types = [None,
POLICY_MODE_KUBERNETES,
POLICY_MODE_KUBERNETES_ANNOTATIONS]
Expand All @@ -423,7 +424,7 @@ def get_policy_driver(cni_plugin):
client_key = policy_config.get(K8S_CLIENT_KEY_VAR)
certificate_authority = policy_config.get(
K8S_CERTIFICATE_AUTHORITY_VAR)
kubeconfig_path = policy_config.get("kubeconfig")
kubeconfig_path = k8s_config.get("kubeconfig")

if (client_key and not os.path.isfile(client_key)) or \
(client_certificate and not os.path.isfile(client_certificate)) or \
Expand Down
39 changes: 30 additions & 9 deletions tests/unit/test_policy_drivers.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def setUp(self):
"policy": {}
}
self.driver = KubernetesAnnotationDriver(self.pod_name, self.namespace,
self.auth_token, self.api_root, None, None, None)
self.auth_token, self.api_root, None, None, None, None)
assert_equal(self.driver.profile_name, self.profile_name)

# Mock the DatastoreClient
Expand All @@ -161,7 +161,7 @@ def test_generate_rules_kube_system(self, m_get_pod):
# Generate rules
rules = self.driver.generate_rules()

# Assert correct. Should allow all.
# Assert correct. Should allow all.
expected = Rules(id=self.profile_name,
inbound_rules=[Rule(action="allow")],
outbound_rules=[Rule(action="allow")])
Expand All @@ -187,7 +187,7 @@ def test_generate_rules_annotations(self, m_get_pod):
# Generate rules
rules = self.driver.generate_rules()

# Assert correct. Should allow all.
# Assert correct. Should allow all.
expected = Rules(id=self.profile_name,
inbound_rules=[Rule(action="allow", protocol="tcp")],
outbound_rules=[Rule(action="allow")])
Expand All @@ -202,7 +202,7 @@ def test_generate_rules_parse_error(self, m_get_pod):

# Mock to raise error
self.driver.policy_parser = MagicMock(spec=self.driver.policy_parser)
self.driver.policy_parser.parse_line.side_effect = ValueError
self.driver.policy_parser.parse_line.side_effect = ValueError

# Generate rules
assert_raises(ApplyProfileError, self.driver.generate_rules)
Expand All @@ -211,7 +211,7 @@ def test_generate_tags(self):
# Mock get_metadata to return labels.
labels = {"key": "value"}
self.driver._get_metadata = MagicMock(spec=self.driver._get_metadata)
self.driver._get_metadata.return_value = labels
self.driver._get_metadata.return_value = labels

# Call
tags = self.driver.generate_tags()
Expand Down Expand Up @@ -257,6 +257,28 @@ def test_get_api_pod(self, m_json_load, m_session):
verify=False)
m_json_load.assert_called_once_with(pod1)

@patch("calico_cni.policy_drivers.HTTPClient", autospec=True)
@patch("calico_cni.policy_drivers.Query", autospec=True)
@patch("calico_cni.policy_drivers.KubeConfig", autospec=True)
def test_get_api_pod_kubeconfig(self, m_kcfg, m_query, m_http):
# Set up driver.
self.driver.pod_name = 'pod-1'
self.driver.namespace = 'a'

pod = Mock()
pod.obj = '{"metadata": {"namespace": "a", "name": "pod-1"}}'
m_query(1, 2, 3).get_by_name.return_value = pod

api_root = "http://kubernetesapi:8080/api/v1/"
self.driver.api_root = api_root
self.driver.kubeconfig_path = "/path/to/kubeconfig"

# Call method under test
p = self.driver._get_api_pod()

# Assert
assert_equal(p, pod.obj)

@patch('calico_cni.policy_drivers.requests.Session', autospec=True)
@patch('json.loads', autospec=True)
def test_get_api_pod_with_client_certs(self, m_json_load, m_session):
Expand All @@ -271,7 +293,6 @@ def test_get_api_pod_with_client_certs(self, m_json_load, m_session):
self.driver.client_key = "key.pem"
self.driver.certificate_authority = "ca.pem"


get_obj = Mock()
get_obj.status_code = 200
get_obj.text = pod1
Expand Down Expand Up @@ -353,7 +374,7 @@ def test_get_api_pod_parse_error(self, m_json_load, m_session):
# Set up mock objects
self.driver.auth_token = 'TOKEN'

m_json_load.side_effect = TypeError
m_json_load.side_effect = TypeError

get_obj = Mock()
get_obj.status_code = 200
Expand Down Expand Up @@ -388,7 +409,7 @@ def setUp(self):
self.network_name = "net-name"
self.namespace = "default"
self.driver = KubernetesPolicyDriver(self.network_name, self.namespace,
None, None, None, None, None)
None, None, None, None, None, None)

# Mock the DatastoreClient
self.client = MagicMock(spec=DatastoreClient)
Expand Down Expand Up @@ -417,7 +438,7 @@ def test_remove_profile(self):
self.driver.remove_profile()


class GetPolicyDriverTest(unittest.TestCase):
class GetPolicyDriverTest(unittest.TestCase):

def test_get_policy_driver_default_k8s(self):
cni_plugin = Mock(spec=CniPlugin)
Expand Down

0 comments on commit 829d032

Please sign in to comment.