Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add in-cluster config support #32

Merged
merged 1 commit into from
Nov 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/example3.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def main():
name += "*"
name += v.version
versions.append(name)
print("%-20s %s" % (api.name, ",".join(versions)))
print("%-40s %s" % (api.name, ",".join(versions)))


if __name__ == '__main__':
Expand Down
2 changes: 2 additions & 0 deletions kubernetes/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@
# limitations under the License.

from .kube_config import load_kube_config
from .incluster_config import load_incluster_config
from .incluster_config import ConfigException
86 changes: 86 additions & 0 deletions kubernetes/config/incluster_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from kubernetes.client import configuration

_SERVICE_HOST_ENV_NAME = "KUBERNETES_SERVICE_HOST"
_SERVICE_PORT_ENV_NAME = "KUBERNETES_SERVICE_PORT"
_SERVICE_TOKEN_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/token"
_SERVICE_CERT_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"


def _join_host_port(host, port):
"""Adapted golang's net.JoinHostPort"""
template = "%s:%s"
host_requires_bracketing = ':' in host or '%' in host
if host_requires_bracketing:
template = "[%s]:%s"
return template % (host, port)


class ConfigException(Exception):
pass


class InClusterConfigLoader(object):
def __init__(self, host_env_name, port_env_name, token_filename,
cert_filename, environ=os.environ):
self._host_env_name = host_env_name
self._port_env_name = port_env_name
self._token_filename = token_filename
self._cert_filename = cert_filename
self._environ = environ

def load_and_set(self):
self._load_config()
self._set_config()

def _load_config(self):
if (self._host_env_name not in self._environ or
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the case where either of the env vars are defined but empty (e.g. KUBERNETES_SERVICE_HOST=)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed these comments, addressed them in #50

self._port_env_name not in self._environ):
raise ConfigException("Service host/port is not set.")

self.host = (
"https://" + _join_host_port(self._environ[self._host_env_name],
self._environ[self._port_env_name]))

if not os.path.isfile(self._token_filename):
raise ConfigException("Service token file does not exists.")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider separating reading of cluster config from modifying configuration to ensure it is only modified once configuration has been validated. This separation of concerns will also allow simplify testing by allowing the read values to be checked without involving configuration:

# Consider using namedtuple instead of a dict for simple types
ClusterConfig = collections.NamedTuple('ClusterConfig', ['host', 'token'...])

def load_incluster_config(environ=os.environ):
  conf = read_incluster_config(environ=environ)
  set_incluster_config(conf)

def read_incluster_config(environ=os.environ):
  # Consider checking the value of env vars rather than the presence of the name to protect against empty vars
  service_host = environ.get(_SERVICE_HOST_ENV_NAME)
  if not service_host:
    raise ConfigException("Service host not set.")
  ...
  host = 'https://%s' % (_join_host_port(service_host, service_port)
  return ClusterConfig(host, token...)

def set_incluster_config(conf):
  configuration.host = conf.host
  ...

This will also avoid the problem of modifying configuration in one unit test and accidentally having that modification affect the behavior of a different test that relies on its state. Due to configuration being a singleton, any changes to its default state must be reverted on test teardown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've used the InClusterConfigLoader to store the values and separate load and set methods.


with open(self._token_filename) as f:
self.token = f.read()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the file is empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed these comments, addressed them in #50


if not os.path.isfile(self._cert_filename):
raise ConfigException(
"Service certification file does not exists.")

self.ssl_ca_cert = self._cert_filename

def _set_config(self):
configuration.host = self.host
configuration.ssl_ca_cert = self.ssl_ca_cert
configuration.api_key['authorization'] = "bearer " + self.token


def load_incluster_config():
"""Use the service account kubernetes gives to pods to connect to kubernetes
cluster. It's intended for clients that expect to be running inside a pod
running on kubernetes. It will raise an exception if called from a process
not running in a kubernetes environment."""
InClusterConfigLoader(host_env_name=_SERVICE_HOST_ENV_NAME,
port_env_name=_SERVICE_PORT_ENV_NAME,
token_filename=_SERVICE_TOKEN_FILENAME,
cert_filename=_SERVICE_CERT_FILENAME).load_and_set()
113 changes: 113 additions & 0 deletions kubernetes/config/incluster_config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import tempfile
import unittest

from kubernetes.client import configuration

from .incluster_config import (ConfigException, InClusterConfigLoader,
_SERVICE_HOST_ENV_NAME, _SERVICE_PORT_ENV_NAME)

_TEST_TOKEN = "temp_token"
_TEST_HOST = "127.0.0.1"
_TEST_IPV6_HOST = "::1"
_TEST_PORT = "80"
_TEST_ENVIRON = {_SERVICE_HOST_ENV_NAME: _TEST_HOST,
_SERVICE_PORT_ENV_NAME: _TEST_PORT}
_TEST_IPV6_ENVIRON = {_SERVICE_HOST_ENV_NAME: _TEST_IPV6_HOST,
_SERVICE_PORT_ENV_NAME: _TEST_PORT}


class InClusterConfigTest(unittest.TestCase):

def setUp(self):
self._temp_files = []

def tearDown(self):
for f in self._temp_files:
os.remove(f)

def _create_file_with_temp_content(self, content=""):
handler, name = tempfile.mkstemp()
self._temp_files.append(name)
os.write(handler, str.encode(content))
os.close(handler)
return name

def get_test_loader(
self,
host_env_name=_SERVICE_HOST_ENV_NAME,
port_env_name=_SERVICE_PORT_ENV_NAME,
token_filename=None,
cert_filename=None,
environ=_TEST_ENVIRON):
if not token_filename:
token_filename = self._create_file_with_temp_content(_TEST_TOKEN)
if not cert_filename:
cert_filename = self._create_file_with_temp_content()
return InClusterConfigLoader(
host_env_name=host_env_name,
port_env_name=port_env_name,
token_filename=token_filename,
cert_filename=cert_filename,
environ=environ)

def test_load_config(self):
cert_filename = self._create_file_with_temp_content()
loader = self.get_test_loader(cert_filename=cert_filename)
loader._load_config()
self.assertEqual("https://%s:%s" % (_TEST_HOST, _TEST_PORT),
loader.host)
self.assertEqual(cert_filename, loader.ssl_ca_cert)
self.assertEqual(_TEST_TOKEN, loader.token)

def test_load_config_with_bracketed_hostname(self):
cert_filename = self._create_file_with_temp_content()
loader = self.get_test_loader(cert_filename=cert_filename,
environ=_TEST_IPV6_ENVIRON)
loader._load_config()
self.assertEqual("https://[%s]:%s" % (_TEST_IPV6_HOST, _TEST_PORT),
loader.host)
self.assertEqual(cert_filename, loader.ssl_ca_cert)
self.assertEqual(_TEST_TOKEN, loader.token)

def _should_fail_load(self, config_loader, reason):
try:
config_loader.load_and_set()
self.fail("Should fail because %s" % reason)
except ConfigException:
# expected
pass

def test_no_port(self):
loader = self.get_test_loader(port_env_name="not_exists_port")
self._should_fail_load(loader, "no port specified")

def test_no_host(self):
loader = self.get_test_loader(host_env_name="not_exists_host")
self._should_fail_load(loader, "no host specified")

def test_no_cert_file(self):
loader = self.get_test_loader(cert_filename="not_exists_file_1123")
self._should_fail_load(loader, "cert file does not exists")

def test_no_token_file(self):
loader = self.get_test_loader(token_filename="not_exists_file_1123")
self._should_fail_load(loader, "token file does not exists")


if __name__ == '__main__':
unittest.main()