Skip to content

Commit

Permalink
Add incluster config support
Browse files Browse the repository at this point in the history
  • Loading branch information
mbohlool committed Nov 23, 2016
1 parent 44395d4 commit e9568d2
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 0 deletions.
2 changes: 2 additions & 0 deletions kubernetes/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@

from .kube_config import load_kube_config
from .watch import Watch
from .incluster_config import load_incluster_config
from .incluster_config import ConfigException
76 changes: 76 additions & 0 deletions kubernetes/util/incluster_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from kubernetes.client import configuration

_SERVICE_HOST_ENV_NAME = "KUBERNETES_SERVICE_HOST"
_SERVICE_PORT_ENV_NAME = "KUBERNETES_SERVICE_PORT"
_SERVICE_TOKEN_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/token"
_SERVICE_CERT_FILENAME = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"


def _join_host_port(host, port):
"""Adapted golang's net.JoinHostPort"""
if ':' in host or '%' in host:
return "[%s]:%s" % (host, str(port))
return "%s:%s" % (host, str(port))


class ConfigException(Exception):
pass


class InClusterConfigLoader(object):

def __init__(self, host_env_name, port_env_name, token_filename,
cert_filename, environ=os.environ):
self._host_env_name = host_env_name
self._port_env_name = port_env_name
self._token_filename = token_filename
self._cert_filename = cert_filename
self._environ = environ

def load(self):
if (self._host_env_name not in self._environ or
self._port_env_name not in self._environ):
raise ConfigException("Service host/port is not set.")

configuration.host = (
"https://" + _join_host_port(self._environ[self._host_env_name],
self._environ[self._port_env_name]))

if not os.path.isfile(self._token_filename):
raise ConfigException("Service token file does not exists.")

with open(self._token_filename, 'r') as f:
configuration.api_key['authorization'] = "bearer " + f.read()

if not os.path.isfile(self._cert_filename):
raise ConfigException(
"Service certification file does not exists.")

configuration.ssl_ca_cert = self._cert_filename


def load_incluster_config():
"""Use the service account kubernetes gives to pods to connect to kubernetes
cluster. It's intended for clients that expect to be running inside a pod
running on kubernetes. It will raise an exception if called from a process
not running in a kubernetes environment."""
InClusterConfigLoader(host_env_name=_SERVICE_HOST_ENV_NAME,
port_env_name=_SERVICE_PORT_ENV_NAME,
token_filename=_SERVICE_TOKEN_FILENAME,
cert_filename=_SERVICE_CERT_FILENAME).load()
101 changes: 101 additions & 0 deletions kubernetes/util/incluster_config_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import tempfile
import unittest

from kubernetes.client import configuration

from .incluster_config import ConfigException, InClusterConfigLoader

_TEST_PORT_ENV_NAME = "port"
_TEST_HOST_ENV_NAME = "host"
_TEST_TOKEN = "temp_token"
_TEST_HOST = "127.0.0.1"
_TEST_PORT = "80"
_TEST_ENVIRON = {_TEST_HOST_ENV_NAME: _TEST_HOST,
_TEST_PORT_ENV_NAME: _TEST_PORT}


class InClusterConfigTest(unittest.TestCase):

def setUp(self):
self._temp_files = []

def tearDown(self):
for f in self._temp_files:
os.remove(f)

def _create_file_with_temp_content(self, content=""):
handler, name = tempfile.mkstemp()
self._temp_files.append(name)
os.write(handler, str.encode(content))
os.close(handler)
return name

def get_test_loader(
self,
host_env_name=_TEST_HOST_ENV_NAME,
port_env_name=_TEST_PORT_ENV_NAME,
token_filename=None,
cert_filename=None,
environ=_TEST_ENVIRON):
if not token_filename:
token_filename = self._create_file_with_temp_content(_TEST_TOKEN)
if not cert_filename:
cert_filename = self._create_file_with_temp_content()
return InClusterConfigLoader(
host_env_name=host_env_name,
port_env_name=port_env_name,
token_filename=token_filename,
cert_filename=cert_filename,
environ=environ)

def test_load_config(self):
cert_filename = self._create_file_with_temp_content()
self.get_test_loader(cert_filename=cert_filename).load()
self.assertEqual("https://%s:%s" % (_TEST_HOST, _TEST_PORT),
configuration.host)
self.assertEqual(cert_filename, configuration.ssl_ca_cert)
self.assertEqual("bearer %s" % _TEST_TOKEN,
configuration.api_key['authorization'])

def _should_fail_load(self, config_loader, reason):
try:
config_loader.load()
self.fail("Should fail because %s" % reason)
except ConfigException:
# expected
pass

def test_no_port(self):
loader = self.get_test_loader(port_env_name="not_exists_port")
self._should_fail_load(loader, "no port specified")

def test_no_host(self):
loader = self.get_test_loader(host_env_name="not_exists_host")
self._should_fail_load(loader, "no host specified")

def test_no_cert_file(self):
loader = self.get_test_loader(cert_filename="not_exists_file_1123")
self._should_fail_load(loader, "cert file does not exists")

def test_no_token_file(self):
loader = self.get_test_loader(token_filename="not_exists_file_1123")
self._should_fail_load(loader, "token file does not exists")


if __name__ == '__main__':
unittest.main()

0 comments on commit e9568d2

Please sign in to comment.