Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TF serving support request logging #1229

Merged
merged 10 commits into from
Jul 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions components/k8s-model-server/fluentd-logger/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Fluentd image with plugin installed.

FROM fluent/fluentd:v1.2-debian

RUN buildDeps="sudo make gcc g++ libc-dev ruby-dev" \
&& apt-get update \
&& apt-get install -y --no-install-recommends $buildDeps \
&& sudo gem install \
fluent-plugin-bigquery -v 2.0.0.beta \
&& sudo gem sources --clear-all \
&& SUDO_FORCE_REMOVE=yes \
apt-get purge -y --auto-remove \
-o APT::AutoRemove::RecommendsImportant=false \
$buildDeps \
&& rm -rf /var/lib/apt/lists/* \
/home/fluent/.gem/ruby/2.3.0/cache/*.gem

ENV FLUENTD_CONF custom/fluent.conf
29 changes: 27 additions & 2 deletions components/k8s-model-server/http-proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

from itertools import repeat
import base64
import json
import logging
import grpc
import random

from google.protobuf.json_format import MessageToDict
import grpc
from grpc.beta import implementations
import numpy as np
from tensorflow_serving.apis import classification_pb2
from tensorflow_serving.apis import input_pb2
from tensorflow_serving.apis import predict_pb2
Expand All @@ -31,7 +34,6 @@
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.options import define, options, parse_command_line
import numpy as np
import tensorflow as tf
from tensorflow.python.saved_model import signature_constants
import tornado.web
Expand All @@ -43,6 +45,10 @@
define("rpc_address", default='localhost', help="tf serving on the given address", type=str)
define("instances_key", default='instances', help="requested instances json object key")
define("debug", default=False, help="run in debug mode")
define("log_request", default=False, help="whether to log requests")
define("request_log_file", default="/tmp/logs/request.log")
define("request_log_pos_file", default="/tmp/logs/request.log.pos")
define("request_log_prob", default=0.01, help="probability to log the request (will be sampled uniformly)")
B64_KEY = 'b64'
WELCOME = "Hello World"
MODEL_SERVER_METADATA_TIMEOUT_SEC = 20
Expand Down Expand Up @@ -248,6 +254,11 @@ def post(self, model_name, version_name=None):
predictions = [dict(zip(*t)) for t in zip(repeat(output_keys), predictions)]
self.write(dict(predictions=predictions))

if self.settings['request_logger'] is not None:
for instance in instances:
if random.random() < self.settings['request_log_prob']:
self.settings['request_logger'].info(json.dumps(instance))


class ClassifyHandler(tornado.web.RequestHandler):
"""
Expand Down Expand Up @@ -302,9 +313,23 @@ def main():

channel = implementations.insecure_channel(options.rpc_address, options.rpc_port)
stub = prediction_service_pb2.beta_create_PredictionService_stub(channel)

if options.log_request:
request_logger = logging.getLogger("RequestLogger")
request_logger.setLevel(logging.INFO)
rorate_handler = logging.handlers.RotatingFileHandler(
options.request_log_file, maxBytes=1000000, backupCount=1)
request_logger.addHandler(rorate_handler)
# touch the pos file.
open(options.request_log_pos_file, "a").close()
else:
request_logger = None

extra_settings = dict(
stub = stub,
signature_map = {},
request_logger = request_logger,
request_log_prob = options.request_log_prob,
)
app = get_application(**extra_settings)
app.listen(options.port)
Expand Down
29 changes: 29 additions & 0 deletions components/k8s-model-server/request-logging.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Request logging for TF Serving

It currently supports streaming to BigQuery.

## Motivation
Logging the requests and responses enables log analysis, continuous training, and skew detection.

## Usage:
Create the Bigquery dataset D and table T under your project P.
The schema should also be set.

```
ks pkg install kubeflow/tf-serving
ks generate tf-serving-request-log mnist --gcpProject=P --dataset=D --table=T
```

Modify `tf-serving-with-request-log.jsonnet` as needed:
- change the param of http proxy for logging, e.g. `--request_log_prob=0.1` (Default is 0.01).

```
ks apply YOUR_ENV -c mnist
```

Start sending requests, and the fluentd worker will stream them to Bigquery.

## Next steps:
1. Support different backends other than Bigquery
1. Support request id (so that the logs can be joined). [Issue](https://github.com/kubeflow/kubeflow/issues/1220).
1. Optionally logs response and other metadata. We probably need a log config other than just sampling probability.
230 changes: 230 additions & 0 deletions kubeflow/tf-serving/prototypes/tf-serving-with-request-log.jsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// @apiVersion 0.1
// @name io.ksonnet.pkg.tf-serving-request-log
// @description tf-serving with request logging
// @shortDescription tf-serving with request logging
// @param name string Name to give to each of the components
// @param gcpProject string The gcp project for Bigquery dataset
// @param dataset string The Bigquery dataset
// @param table string The Bigquery table
// @optionalParam modelBasePath string gs://kubeflow-examples-data/mnist The model path
// @optionalParam modelName string mnist The model name

local k = import "k.libsonnet";

local namespace = "kubeflow";
local appName = import "param://name";
local image = "gcr.io/kubeflow-images-public/tf-model-server-cpu:v20180327-995786ec";
local httpProxyImage = "gcr.io/kubeflow-images-public/tf-model-server-http-proxy:v20180723";
local loggingImage = "gcr.io/kubeflow-images-public/tf-model-server-request-logger:v20180723";

local gcpSecretName = "user-gcp-sa";

local service = {
apiVersion: "v1",
kind: "Service",
metadata: {
labels: {
app: appName,
},
name: appName,
namespace: namespace,
},
spec: {
ports: [
{
name: "grpc-tf-serving",
port: 9000,
targetPort: 9000,
},
{
name: "http-tf-serving-proxy",
port: 8000,
targetPort: 8000,
},
],
selector: {
app: appName,
},
type: "ClusterIP",
},
};

local configMap = {
apiVersion: "v1",
kind: "ConfigMap",
metadata: {
name: appName + "fluentd-config",
namespace: namespace,
},
data: {
"fluent.conf": std.format(|||
<source>
@type tail
path /tmp/logs/request.log
pos_file /tmp/logs/request.log.pos
<parse>
@type json
</parse>
tag dummy
</source>
<match dummy>
@type bigquery_insert
auth_method application_default
project %s
dataset %s
table %s
fetch_schema true
</match>
|||, [params.gcpProject, params.dataset, params.table]),
},
};

local deployment = {
apiVersion: "extensions/v1beta1",
kind: "Deployment",
metadata: {
labels: {
app: appName,
},
name: appName,
namespace: namespace,
},
spec: {
template: {
metadata: {
labels: {
app: appName,
},
},
spec: {
containers: [
// ModelServer
{
args: [
"/usr/bin/tensorflow_model_server",
"--port=9000",
"--model_name=" + params.modelName,
"--model_base_path=" + params.modelBasePath,
],
image: image,
imagePullPolicy: "IfNotPresent",
name: "model-server",
ports: [
{
containerPort: 9000,
},
],
resources: {
limits: {
cpu: "4",
memory: "4Gi",
},
requests: {
cpu: "1",
memory: "1Gi",
},
},
},
// Http proxy
{
name: "http-proxy",
image: httpProxyImage,
imagePullPolicy: "Always",
command: [
"python",
"/usr/src/app/server.py",
"--port=8000",
"--rpc_port=9000",
"--rpc_timeout=10.0",
"--log_request=true",
],
env: [],
ports: [
{
containerPort: 8000,
},
],
resources: {
requests: {
memory: "1Gi",
cpu: "1",
},
limits: {
memory: "4Gi",
cpu: "4",
},
},
securityContext: {
runAsUser: 1000,
fsGroup: 1000,
},
volumeMounts: [
{
name: "request-logs",
mountPath: "/tmp/logs",
},
],
},
// TODO(lunkai): use admission controller to inject.
// Logging container.
{
name: "logging",
image: loggingImage,
imagePullPolicy: "Always",
env: [
{ name: "GOOGLE_APPLICATION_CREDENTIALS", value: "/secret/gcp-credentials/key.json" },
],
resources: {
requests: {
memory: "250Mi",
cpu: "0.25",
},
limits: {
memory: "500Mi",
cpu: "0.5",
},
},
volumeMounts: [
{
name: "request-logs",
mountPath: "/tmp/logs",
},
{
name: "gcp-credentials",
mountPath: "/secret/gcp-credentials",
},
{
name: "fluentd-config-volume",
mountPath: "/fluentd/etc/custom",
},
],
},
],
volumes: [
{
name: "gcp-credentials",
secret: {
secretName: gcpSecretName,
},
},
{
name: "request-logs",
emptyDir: {},
},
{
configMap: {
name: "fluentd-config",
},
name: "fluentd-config-volume",
},
],
},
},
},
};

k.core.v1.list.new([
service,
deployment,
configMap,
])