Skip to content

Commit

Permalink
Adds support for kubectl proxy in kubernetes container manager (#455)
Browse files Browse the repository at this point in the history
* added support for kubectl proxy in kubernetes container manager

* run kubectl proxy in background

* fixed bug

* fix k8s proxy addr

* wait for clipper to initialize

* caffe2 problems, see #479
  • Loading branch information
dcrankshaw committed May 10, 2018
1 parent 7e6988b commit fc7dc73
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 57 deletions.
2 changes: 2 additions & 0 deletions bin/ci_checks.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ python $DIR/construct_kube_config.py $KUBECONFIG

# Test K8s cluster access
kubectl get nodes
# Set kubectl proxy for k8s tests later
kubectl proxy --port 8080 &

$DIR/check_format.sh
$DIR/run_unittests.sh
4 changes: 3 additions & 1 deletion clipper_admin/clipper_admin/clipper_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ def start_clipper(
try:
url = "http://{host}/metrics".format(
host=self.cm.get_query_addr())
requests.get(url, timeout=5)
r = requests.get(url, timeout=5)
if r.status_code != requests.codes.ok:
raise RequestException
break
except RequestException as e:
logger.info("Clipper still initializing.")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import absolute_import, division, print_function
from ..container_manager import (create_model_container_label,
ContainerManager, CLIPPER_DOCKER_LABEL,
CLIPPER_MODEL_CONTAINER_LABEL)
from ..container_manager import (
create_model_container_label, ContainerManager, CLIPPER_DOCKER_LABEL,
CLIPPER_MODEL_CONTAINER_LABEL, CLIPPER_INTERNAL_MANAGEMENT_PORT,
CLIPPER_INTERNAL_QUERY_PORT, CLIPPER_INTERNAL_METRIC_PORT)
from ..exceptions import ClipperException
from .kubernetes_metric_utils import start_prometheus, CLIPPER_FRONTEND_EXPORTER_IMAGE

Expand Down Expand Up @@ -34,34 +35,45 @@ def _pass_conflicts():

class KubernetesContainerManager(ContainerManager):
def __init__(self,
kubernetes_api_ip,
kubernetes_proxy_addr=None,
redis_ip=None,
redis_port=6379,
useInternalIP=False):
"""
Parameters
----------
kubernetes_api_ip : str
The hostname or IP address of the Kubernetes API server for your Kubernetes cluster.
kubernetes_proxy_addr : str, optional
The proxy address if you are proxying connections locally using ``kubectl proxy``.
If this argument is provided, Clipper will construct the appropriate proxy
URLs for accessing Clipper's Kubernetes services, rather than using the API server
addres provided in your kube config.
redis_ip : str, optional
The address of a running Redis cluster. If set to None, Clipper will start
a Redis deployment for you.
redis_port : int, optional
The Redis port. If ``redis_ip`` is set to None, Clipper will start Redis on this port.
If ``redis_ip`` is provided, Clipper will connect to Redis on this port.
useInternalIP : bool, optional
Use Internal IP of the K8S nodes . If ``useInternalIP`` is set to False, Clipper will throw an exception, if none of the nodes have ExternalDNS .
If ``useInternalIP`` is set to true, Clipper will use the Internal IP of the K8S node if no ExternalDNS exists for any of the nodes.
Use Internal IP of the K8S nodes . If ``useInternalIP`` is set to False, Clipper will
throw an exception if none of the nodes have ExternalDNS.
If ``useInternalIP`` is set to true, Clipper will use the Internal IP of the K8S node
if no ExternalDNS exists for any of the nodes.
Note
----
Clipper stores all persistent configuration state (such as registered application and model
information) in Redis. If you want Clipper to be durable and able to recover from failures,
we recommend configuring your own persistent and replicated Redis cluster rather than letting
Clipper launch one for you.
we recommend configuring your own persistent and replicated Redis cluster rather than
letting Clipper launch one for you.
"""

self.kubernetes_api_ip = kubernetes_api_ip
if kubernetes_proxy_addr is not None:
self.kubernetes_proxy_addr = kubernetes_proxy_addr
self.use_k8s_proxy = True
else:
self.use_k8s_proxy = False

self.redis_ip = redis_ip
self.redis_port = redis_port
self.useInternalIP = useInternalIP
Expand Down Expand Up @@ -141,7 +153,7 @@ def connect(self):
external_node_hosts.append(addr.address)

if len(external_node_hosts) == 0 and (self.useInternalIP):
msg = "No external node addresses found.Using Internal IP address"
msg = "No external node addresses found. Using Internal IP address"
logger.warn(msg)
for addr in node.status.addresses:
if addr.type == "InternalIP":
Expand Down Expand Up @@ -264,7 +276,7 @@ def deploy_model(self, name, version, input_type, image, num_replicas=1):

while self._k8s_beta.read_namespaced_deployment_status(
name=deployment_name, namespace='default').status.available_replicas \
!= num_replicas:
!= num_replicas:
time.sleep(3)

def get_num_replicas(self, name, version):
Expand All @@ -287,7 +299,6 @@ def set_num_replicas(self, name, version, input_type, image, num_replicas):
}
})


while self._k8s_beta.read_namespaced_deployment_status(
name=deployment_name, namespace='default').status.available_replicas \
!= num_replicas:
Expand Down Expand Up @@ -387,17 +398,40 @@ def get_registry(self):
return self.registry

def get_admin_addr(self):
return "{host}:{port}".format(
host=self.external_node_hosts[0],
port=self.clipper_management_port)
if self.use_k8s_proxy:
return ("{proxy_addr}/api/v1/namespaces/{ns}/"
"services/mgmt-frontend:{port}/proxy").format(
proxy_addr=self.kubernetes_proxy_addr,
ns="default",
port=CLIPPER_INTERNAL_MANAGEMENT_PORT)

else:
return "{host}:{port}".format(
host=self.external_node_hosts[0],
port=self.clipper_management_port)

def get_query_addr(self):
return "{host}:{port}".format(
host=self.external_node_hosts[0], port=self.clipper_query_port)
if self.use_k8s_proxy:
return ("{proxy_addr}/api/v1/namespaces/{ns}/"
"services/query-frontend:{port}/proxy").format(
proxy_addr=self.kubernetes_proxy_addr,
ns="default",
port=CLIPPER_INTERNAL_QUERY_PORT)
else:
return "{host}:{port}".format(
host=self.external_node_hosts[0], port=self.clipper_query_port)

def get_metric_addr(self):
return "{host}:{port}".format(
host=self.external_node_hosts[0], port=self.clipper_metric_port)
if self.use_k8s_proxy:
return ("{proxy_addr}/api/v1/namespaces/{ns}/"
"services/metrics:{port}/proxy").format(
proxy_addr=self.kubernetes_proxy_addr,
ns="default",
port=CLIPPER_INTERNAL_METRIC_PORT)
else:
return "{host}:{port}".format(
host=self.external_node_hosts[0],
port=self.clipper_metric_port)


def get_model_deployment_name(name, version):
Expand Down
70 changes: 41 additions & 29 deletions integration-tests/kubernetes_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,36 @@ def create_and_test_app(clipper_conn, name, num_models):
time.sleep(1)


def test_kubernetes(clipper_conn, num_apps, num_models):
time.sleep(10)
print(clipper_conn.cm.get_query_addr())
print(clipper_conn.inspect_instance())
try:
logger.info("Running integration test with %d apps and %d models" %
(num_apps, num_models))
for a in range(num_apps):
create_and_test_app(clipper_conn, "testapp%s" % a, num_models)

if not os.path.exists(CLIPPER_TEMP_DIR):
os.makedirs(CLIPPER_TEMP_DIR)
tmp_log_dir = tempfile.mkdtemp(dir=CLIPPER_TEMP_DIR)
logger.info(clipper_conn.get_clipper_logs(tmp_log_dir))
# Remove temp files
shutil.rmtree(tmp_log_dir)
log_clipper_state(clipper_conn)
logger.info("SUCCESS")
except BenchmarkException:
log_clipper_state(clipper_conn)
logger.exception("BenchmarkException")
create_kubernetes_connection(cleanup=True, start_clipper=False)
sys.exit(1)
except ClipperException:
log_clipper_state(clipper_conn)
logger.exception("ClipperException")
create_kubernetes_connection(cleanup=True, start_clipper=False)
sys.exit(1)


if __name__ == "__main__":
num_apps = 3
num_models = 3
Expand All @@ -114,36 +144,18 @@ def create_and_test_app(clipper_conn, name, num_models):
# for num_apps and num_models
pass
try:
# Test without proxy first
clipper_conn = create_kubernetes_connection(
cleanup=True, start_clipper=True, with_proxy=False)
test_kubernetes(clipper_conn, num_apps, num_models)
clipper_conn.stop_all()

# Test with proxy. Assumes proxy is running at 127.0.0.1:8080
clipper_conn = create_kubernetes_connection(
cleanup=True, start_clipper=True)
time.sleep(10)
print(clipper_conn.cm.get_query_addr())
print(clipper_conn.inspect_instance())
try:
logger.info("Running integration test with %d apps and %d models" %
(num_apps, num_models))
for a in range(num_apps):
create_and_test_app(clipper_conn, "testapp%s" % a, num_models)

if not os.path.exists(CLIPPER_TEMP_DIR):
os.makedirs(CLIPPER_TEMP_DIR)
tmp_log_dir = tempfile.mkdtemp(dir=CLIPPER_TEMP_DIR)
logger.info(clipper_conn.get_clipper_logs(tmp_log_dir))
# Remove temp files
shutil.rmtree(tmp_log_dir)
log_clipper_state(clipper_conn)
logger.info("SUCCESS")
clipper_conn.stop_all()
except BenchmarkException:
log_clipper_state(clipper_conn)
logger.exception("BenchmarkException")
create_kubernetes_connection(cleanup=True, start_clipper=False)
sys.exit(1)
except ClipperException:
log_clipper_state(clipper_conn)
logger.exception("ClipperException")
create_kubernetes_connection(cleanup=True, start_clipper=False)
sys.exit(1)
cleanup=True, start_clipper=True, with_proxy=True)
test_kubernetes(clipper_conn, 1, 1)
clipper_conn.stop_all()

except Exception as e:
logger.exception("Exception: {}".format(e))
sys.exit(1)
17 changes: 11 additions & 6 deletions integration-tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import tempfile
cur_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.abspath("%s/../clipper_admin" % cur_dir))
from clipper_admin import ClipperConnection, DockerContainerManager, KubernetesContainerManager, CLIPPER_TEMP_DIR
from clipper_admin import (ClipperConnection, DockerContainerManager,
KubernetesContainerManager, CLIPPER_TEMP_DIR)
from clipper_admin.container_manager import CLIPPER_DOCKER_LABEL
from clipper_admin import __version__ as clipper_version

Expand Down Expand Up @@ -96,11 +97,15 @@ def create_docker_connection(cleanup=True, start_clipper=True):
return cl


def create_kubernetes_connection(cleanup=True, start_clipper=True):
def create_kubernetes_connection(cleanup=True,
start_clipper=True,
with_proxy=False):
logging.info("Creating KubernetesContainerManager")
kubernetes_ip = "https://api.cluster.clipper-k8s-testing.com"
logging.info("Kubernetes IP: %s" % kubernetes_ip)
cm = KubernetesContainerManager(kubernetes_ip)

if with_proxy:
cm = KubernetesContainerManager(kubernetes_proxy_addr="127.0.0.1:8080")
else:
cm = KubernetesContainerManager()
cl = ClipperConnection(cm)
if cleanup:
cl.stop_all()
Expand All @@ -119,7 +124,7 @@ def create_kubernetes_connection(cleanup=True, start_clipper=True):
else:
try:
cl.connect()
except Exception as e:
except Exception:
pass
return cl

Expand Down

0 comments on commit fc7dc73

Please sign in to comment.