diff --git a/components/k8s-model-server/fluentd-logger/Dockerfile b/components/k8s-model-server/fluentd-logger/Dockerfile new file mode 100644 index 00000000000..771de432669 --- /dev/null +++ b/components/k8s-model-server/fluentd-logger/Dockerfile @@ -0,0 +1,18 @@ +# Fluentd image with plugin installed. + +FROM fluent/fluentd:v1.2-debian + +RUN buildDeps="sudo make gcc g++ libc-dev ruby-dev" \ + && apt-get update \ + && apt-get install -y --no-install-recommends $buildDeps \ + && sudo gem install \ + fluent-plugin-bigquery -v 2.0.0.beta \ + && sudo gem sources --clear-all \ + && SUDO_FORCE_REMOVE=yes \ + apt-get purge -y --auto-remove \ + -o APT::AutoRemove::RecommendsImportant=false \ + $buildDeps \ + && rm -rf /var/lib/apt/lists/* \ + /home/fluent/.gem/ruby/2.3.0/cache/*.gem + +ENV FLUENTD_CONF custom/fluent.conf diff --git a/components/k8s-model-server/http-proxy/server.py b/components/k8s-model-server/http-proxy/server.py index de9f6be0705..e5eebdf7757 100644 --- a/components/k8s-model-server/http-proxy/server.py +++ b/components/k8s-model-server/http-proxy/server.py @@ -18,11 +18,14 @@ from itertools import repeat import base64 +import json import logging -import grpc +import random from google.protobuf.json_format import MessageToDict +import grpc from grpc.beta import implementations +import numpy as np from tensorflow_serving.apis import classification_pb2 from tensorflow_serving.apis import input_pb2 from tensorflow_serving.apis import predict_pb2 @@ -31,7 +34,6 @@ from tornado import gen from tornado.ioloop import IOLoop from tornado.options import define, options, parse_command_line -import numpy as np import tensorflow as tf from tensorflow.python.saved_model import signature_constants import tornado.web @@ -43,6 +45,10 @@ define("rpc_address", default='localhost', help="tf serving on the given address", type=str) define("instances_key", default='instances', help="requested instances json object key") define("debug", default=False, help="run in debug mode") +define("log_request", default=False, help="whether to log requests") +define("request_log_file", default="/tmp/logs/request.log") +define("request_log_pos_file", default="/tmp/logs/request.log.pos") +define("request_log_prob", default=0.01, help="probability to log the request (will be sampled uniformly)") B64_KEY = 'b64' WELCOME = "Hello World" MODEL_SERVER_METADATA_TIMEOUT_SEC = 20 @@ -248,6 +254,11 @@ def post(self, model_name, version_name=None): predictions = [dict(zip(*t)) for t in zip(repeat(output_keys), predictions)] self.write(dict(predictions=predictions)) + if self.settings['request_logger'] is not None: + for instance in instances: + if random.random() < self.settings['request_log_prob']: + self.settings['request_logger'].info(json.dumps(instance)) + class ClassifyHandler(tornado.web.RequestHandler): """ @@ -302,9 +313,23 @@ def main(): channel = implementations.insecure_channel(options.rpc_address, options.rpc_port) stub = prediction_service_pb2.beta_create_PredictionService_stub(channel) + + if options.log_request: + request_logger = logging.getLogger("RequestLogger") + request_logger.setLevel(logging.INFO) + rorate_handler = logging.handlers.RotatingFileHandler( + options.request_log_file, maxBytes=1000000, backupCount=1) + request_logger.addHandler(rorate_handler) + # touch the pos file. + open(options.request_log_pos_file, "a").close() + else: + request_logger = None + extra_settings = dict( stub = stub, signature_map = {}, + request_logger = request_logger, + request_log_prob = options.request_log_prob, ) app = get_application(**extra_settings) app.listen(options.port) diff --git a/components/k8s-model-server/request-logging.md b/components/k8s-model-server/request-logging.md new file mode 100644 index 00000000000..7b72bafc428 --- /dev/null +++ b/components/k8s-model-server/request-logging.md @@ -0,0 +1,29 @@ +# Request logging for TF Serving + +It currently supports streaming to BigQuery. + +## Motivation +Logging the requests and responses enables log analysis, continuous training, and skew detection. + +## Usage: +Create the Bigquery dataset D and table T under your project P. +The schema should also be set. + +``` +ks pkg install kubeflow/tf-serving +ks generate tf-serving-request-log mnist --gcpProject=P --dataset=D --table=T +``` + +Modify `tf-serving-with-request-log.jsonnet` as needed: + - change the param of http proxy for logging, e.g. `--request_log_prob=0.1` (Default is 0.01). + +``` +ks apply YOUR_ENV -c mnist +``` + +Start sending requests, and the fluentd worker will stream them to Bigquery. + +## Next steps: +1. Support different backends other than Bigquery +1. Support request id (so that the logs can be joined). [Issue](https://github.com/kubeflow/kubeflow/issues/1220). +1. Optionally logs response and other metadata. We probably need a log config other than just sampling probability. diff --git a/kubeflow/tf-serving/prototypes/tf-serving-with-request-log.jsonnet b/kubeflow/tf-serving/prototypes/tf-serving-with-request-log.jsonnet new file mode 100644 index 00000000000..44ba455a1a0 --- /dev/null +++ b/kubeflow/tf-serving/prototypes/tf-serving-with-request-log.jsonnet @@ -0,0 +1,230 @@ +// @apiVersion 0.1 +// @name io.ksonnet.pkg.tf-serving-request-log +// @description tf-serving with request logging +// @shortDescription tf-serving with request logging +// @param name string Name to give to each of the components +// @param gcpProject string The gcp project for Bigquery dataset +// @param dataset string The Bigquery dataset +// @param table string The Bigquery table +// @optionalParam modelBasePath string gs://kubeflow-examples-data/mnist The model path +// @optionalParam modelName string mnist The model name + +local k = import "k.libsonnet"; + +local namespace = "kubeflow"; +local appName = import "param://name"; +local image = "gcr.io/kubeflow-images-public/tf-model-server-cpu:v20180327-995786ec"; +local httpProxyImage = "gcr.io/kubeflow-images-public/tf-model-server-http-proxy:v20180723"; +local loggingImage = "gcr.io/kubeflow-images-public/tf-model-server-request-logger:v20180723"; + +local gcpSecretName = "user-gcp-sa"; + +local service = { + apiVersion: "v1", + kind: "Service", + metadata: { + labels: { + app: appName, + }, + name: appName, + namespace: namespace, + }, + spec: { + ports: [ + { + name: "grpc-tf-serving", + port: 9000, + targetPort: 9000, + }, + { + name: "http-tf-serving-proxy", + port: 8000, + targetPort: 8000, + }, + ], + selector: { + app: appName, + }, + type: "ClusterIP", + }, +}; + +local configMap = { + apiVersion: "v1", + kind: "ConfigMap", + metadata: { + name: appName + "fluentd-config", + namespace: namespace, + }, + data: { + "fluent.conf": std.format(||| + + @type tail + path /tmp/logs/request.log + pos_file /tmp/logs/request.log.pos + + @type json + + tag dummy + + + @type bigquery_insert + auth_method application_default + project %s + dataset %s + table %s + fetch_schema true + + |||, [params.gcpProject, params.dataset, params.table]), + }, +}; + +local deployment = { + apiVersion: "extensions/v1beta1", + kind: "Deployment", + metadata: { + labels: { + app: appName, + }, + name: appName, + namespace: namespace, + }, + spec: { + template: { + metadata: { + labels: { + app: appName, + }, + }, + spec: { + containers: [ + // ModelServer + { + args: [ + "/usr/bin/tensorflow_model_server", + "--port=9000", + "--model_name=" + params.modelName, + "--model_base_path=" + params.modelBasePath, + ], + image: image, + imagePullPolicy: "IfNotPresent", + name: "model-server", + ports: [ + { + containerPort: 9000, + }, + ], + resources: { + limits: { + cpu: "4", + memory: "4Gi", + }, + requests: { + cpu: "1", + memory: "1Gi", + }, + }, + }, + // Http proxy + { + name: "http-proxy", + image: httpProxyImage, + imagePullPolicy: "Always", + command: [ + "python", + "/usr/src/app/server.py", + "--port=8000", + "--rpc_port=9000", + "--rpc_timeout=10.0", + "--log_request=true", + ], + env: [], + ports: [ + { + containerPort: 8000, + }, + ], + resources: { + requests: { + memory: "1Gi", + cpu: "1", + }, + limits: { + memory: "4Gi", + cpu: "4", + }, + }, + securityContext: { + runAsUser: 1000, + fsGroup: 1000, + }, + volumeMounts: [ + { + name: "request-logs", + mountPath: "/tmp/logs", + }, + ], + }, + // TODO(lunkai): use admission controller to inject. + // Logging container. + { + name: "logging", + image: loggingImage, + imagePullPolicy: "Always", + env: [ + { name: "GOOGLE_APPLICATION_CREDENTIALS", value: "/secret/gcp-credentials/key.json" }, + ], + resources: { + requests: { + memory: "250Mi", + cpu: "0.25", + }, + limits: { + memory: "500Mi", + cpu: "0.5", + }, + }, + volumeMounts: [ + { + name: "request-logs", + mountPath: "/tmp/logs", + }, + { + name: "gcp-credentials", + mountPath: "/secret/gcp-credentials", + }, + { + name: "fluentd-config-volume", + mountPath: "/fluentd/etc/custom", + }, + ], + }, + ], + volumes: [ + { + name: "gcp-credentials", + secret: { + secretName: gcpSecretName, + }, + }, + { + name: "request-logs", + emptyDir: {}, + }, + { + configMap: { + name: "fluentd-config", + }, + name: "fluentd-config-volume", + }, + ], + }, + }, + }, +}; + +k.core.v1.list.new([ + service, + deployment, + configMap, +])