Skip to content

Commit

Permalink
Make metric collector emit k8s event (#1161)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunmingg authored and k8s-ci-robot committed Jul 11, 2018
1 parent 0c69372 commit a730ddf
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 4 deletions.
65 changes: 64 additions & 1 deletion kubeflow/core/metric-collector.libsonnet
Original file line number Diff line number Diff line change
@@ -1,13 +1,75 @@
{
all(params):: [
$.parts(params.namespace, params.metricImage, params.targetUrl, params.oauthSecretName).metricServiceAccount,
$.parts(params.namespace, params.metricImage, params.targetUrl, params.oauthSecretName).metricRole,
$.parts(params.namespace, params.metricImage, params.targetUrl, params.oauthSecretName).metricRoleBinding,
$.parts(params.namespace, params.metricImage, params.targetUrl, params.oauthSecretName).deploy,
],

parts(namespace, image, targetUrl, oauthSecretName):: {
metricServiceAccount: {
apiVersion: "v1",
kind: "ServiceAccount",
metadata: {
labels: {
app: "metric-collector",
},
name: "metric-collector",
namespace: namespace,
},
},
metricRole:: {
apiVersion: "rbac.authorization.k8s.io/v1beta1",
kind: "ClusterRole",
metadata: {
labels: {
app: "metric-collector",
},
name: "metric-collector",
},
rules: [
{
apiGroups: [
"",
],
resources: [
"services",
"events",
],
verbs: [
"*",
],
},
],
},
metricRoleBinding:: {
apiVersion: "rbac.authorization.k8s.io/v1beta1",
kind: "ClusterRoleBinding",
metadata: {
labels: {
app: "metric-collector",
},
name: "metric-collector",
},
roleRef: {
apiGroup: "rbac.authorization.k8s.io",
kind: "ClusterRole",
name: "metric-collector",
},
subjects: [
{
kind: "ServiceAccount",
name: "metric-collector",
namespace: namespace,
},
],
},
deploy:: {
apiVersion: "extensions/v1beta1",
kind: "Deployment",
metadata: {
labels: {
app: "metric-collector",
},
name: "metric-collector",
namespace: namespace,
},
Expand Down Expand Up @@ -57,6 +119,7 @@
name: "exporter",
},
],
serviceAccountName: "metric-collector",
restartPolicy: "Always",
volumes: [
{
Expand Down
1 change: 1 addition & 0 deletions metric-collector/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ RUN set -ex \
RUN pip3 install --upgrade wheel && \
pip3 install requests && \
pip3 install prometheus_client && \
pip3 install kubernetes && \
pip3 install google-api-python-client

COPY service-readiness/kubeflow-readiness.py /opt/
46 changes: 43 additions & 3 deletions metric-collector/service-readiness/kubeflow-readiness.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import argparse
from time import sleep, time

import logging
import google.auth
import google.auth.app_engine
import google.auth.compute_engine.credentials
Expand All @@ -10,6 +11,9 @@
import google.oauth2.service_account
from prometheus_client import start_http_server, Gauge
import requests
from kubernetes import client, config
from kubernetes.client import V1Event, V1ObjectMeta


IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
Expand All @@ -23,8 +27,10 @@ def metric_update(args, google_open_id_connect_token):
google_open_id_connect_token)})
if resp.status_code == 200:
KUBEFLOW_AVAILABILITY.set(1)
return 1
else:
KUBEFLOW_AVAILABILITY.set(0)
return 0

def main(unparsed_args=None):
parser = argparse.ArgumentParser(
Expand All @@ -43,8 +49,8 @@ def main(unparsed_args=None):

args = parser.parse_args(args=unparsed_args)

if args.url == "":
sleep(2000)
if args.url == "" or args.client_id == "":
logging.info("Url or client_id is empty, exit")
return

# Figure out what environment we're running in and get some preliminary
Expand Down Expand Up @@ -90,6 +96,9 @@ def main(unparsed_args=None):
})

token_refresh_time = 0
last_status = -1
config.load_incluster_config()
coreApi = client.CoreV1Api()
while True:
if time() > token_refresh_time:
# service_account_credentials gives us a JWT signed by the service
Expand All @@ -98,7 +107,38 @@ def main(unparsed_args=None):
google_open_id_connect_token = get_google_open_id_connect_token(
service_account_credentials)
token_refresh_time = time() + 1800
metric_update(args, google_open_id_connect_token)
url_status = metric_update(args, google_open_id_connect_token)
if url_status != last_status:
last_status = url_status
# get service centraldashboard, attach event to it.
svcs = coreApi.list_namespaced_service('kubeflow', label_selector="app=centraldashboard")
while len(svcs.to_dict()['items']) == 0:
logging.info("Service centraldashboard not ready...")
sleep(10)
svcs = coreApi.list_namespaced_service('kubeflow', label_selector="app=centraldashboard")
uid = svcs.to_dict()['items'][0]['metadata']['uid']
kf_status = "up" if url_status == 1 else "down"
new_event = V1Event(
action="Kubeflow service status update: " + kf_status,
api_version="v1",
kind="Event",
message="Service " + kf_status + "; service url: " + args.url,
reason="Kubeflow Service is " + kf_status,
involved_object=client.V1ObjectReference(
api_version="v1",
kind="Service",
name="centraldashboard",
namespace="kubeflow",
uid=uid
),
metadata=V1ObjectMeta(
generate_name='kubeflow-service.',
),
type="Normal"
)
event = coreApi.create_namespaced_event("kubeflow", new_event)
print("New status event created. action='%s'" % str(event.action))

# Update status every 10 sec
sleep(10)

Expand Down

0 comments on commit a730ddf

Please sign in to comment.