/
python.py
176 lines (154 loc) · 7.43 KB
/
python.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
from __future__ import print_function, with_statement, absolute_import
import logging
import os
import posixpath
import shutil
from ..version import __version__
from .deployer_utils import save_python_function
logger = logging.getLogger(__name__)
def create_endpoint(
clipper_conn,
name,
input_type,
func,
default_output="None",
version=1,
slo_micros=3000000,
labels=None,
registry=None,
base_image="clipper/python-closure-container:{}".format(__version__),
num_replicas=1):
"""Registers an application and deploys the provided predict function as a model.
Parameters
----------
clipper_conn : :py:meth:`clipper_admin.ClipperConnection`
A ``ClipperConnection`` object connected to a running Clipper cluster.
name : str
The name to be assigned to both the registered application and deployed model.
input_type : str
The input_type to be associated with the registered app and deployed model.
One of "integers", "floats", "doubles", "bytes", or "strings".
func : function
The prediction function. Any state associated with the function will be
captured via closure capture and pickled with Cloudpickle.
default_output : str, optional
The default output for the application. The default output will be returned whenever
an application is unable to receive a response from a model within the specified
query latency SLO (service level objective). The reason the default output was returned
is always provided as part of the prediction response object. Defaults to "None".
version : str, optional
The version to assign this model. Versions must be unique on a per-model
basis, but may be re-used across different models.
slo_micros : int, optional
The query latency objective for the application in microseconds.
This is the processing latency between Clipper receiving a request
and sending a response. It does not account for network latencies
before a request is received or after a response is sent.
If Clipper cannot process a query within the latency objective,
the default output is returned. Therefore, it is recommended that
the SLO not be set aggressively low unless absolutely necessary.
100000 (100ms) is a good starting value, but the optimal latency objective
will vary depending on the application.
labels : list(str), optional
A list of strings annotating the model. These are ignored by Clipper
and used purely for user annotations.
registry : str, optional
The Docker container registry to push the freshly built model to. Note
that if you are running Clipper on Kubernetes, this registry must be accessible
to the Kubernetes cluster in order to fetch the container from the registry.
base_image : str, optional
The base Docker image to build the new model image from. This
image should contain all code necessary to run a Clipper model
container RPC client.
num_replicas : int, optional
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
"""
clipper_conn.register_application(name, input_type, default_output,
slo_micros)
deploy_python_closure(clipper_conn, name, version, input_type, func,
base_image, labels, registry, num_replicas)
clipper_conn.link_model_to_app(name, name)
def deploy_python_closure(
clipper_conn,
name,
version,
input_type,
func,
base_image="clipper/python-closure-container:{}".format(__version__),
labels=None,
registry=None,
num_replicas=1):
"""Deploy an arbitrary Python function to Clipper.
The function should take a list of inputs of the type specified by `input_type` and
return a Python list or numpy array of predictions as strings.
Parameters
----------
clipper_conn : :py:meth:`clipper_admin.ClipperConnection`
A ``ClipperConnection`` object connected to a running Clipper cluster.
name : str
The name to be assigned to both the registered application and deployed model.
version : str
The version to assign this model. Versions must be unique on a per-model
basis, but may be re-used across different models.
input_type : str
The input_type to be associated with the registered app and deployed model.
One of "integers", "floats", "doubles", "bytes", or "strings".
func : function
The prediction function. Any state associated with the function will be
captured via closure capture and pickled with Cloudpickle.
base_image : str, optional
The base Docker image to build the new model image from. This
image should contain all code necessary to run a Clipper model
container RPC client.
labels : list(str), optional
A list of strings annotating the model. These are ignored by Clipper
and used purely for user annotations.
registry : str, optional
The Docker container registry to push the freshly built model to. Note
that if you are running Clipper on Kubernetes, this registry must be accesible
to the Kubernetes cluster in order to fetch the container from the registry.
num_replicas : int, optional
The number of replicas of the model to create. The number of replicas
for a model can be changed at any time with
:py:meth:`clipper.ClipperConnection.set_num_replicas`.
Example
-------
Define a pre-processing function ``center()`` and train a model on the pre-processed input::
from clipper_admin import ClipperConnection, DockerContainerManager
from clipper_admin.deployers.python import deploy_python_closure
import numpy as np
import sklearn
clipper_conn = ClipperConnection(DockerContainerManager())
# Connect to an already-running Clipper cluster
clipper_conn.connect()
def center(xs):
means = np.mean(xs, axis=0)
return xs - means
centered_xs = center(xs)
model = sklearn.linear_model.LogisticRegression()
model.fit(centered_xs, ys)
# Note that this function accesses the trained model via closure capture,
# rather than having the model passed in as an explicit argument.
def centered_predict(inputs):
centered_inputs = center(inputs)
# model.predict returns a list of predictions
preds = model.predict(centered_inputs)
return [str(p) for p in preds]
deploy_python_closure(
clipper_conn,
name="example",
input_type="doubles",
func=centered_predict)
"""
serialization_dir = save_python_function(name, func)
# Special handling for Windows, which uses backslash for path delimiting
serialization_dir = posixpath.join(*os.path.split(serialization_dir))
logger.info("Python closure saved")
# Deploy function
clipper_conn.build_and_deploy_model(name, version, input_type,
serialization_dir, base_image, labels,
registry, num_replicas)
# Remove temp files
shutil.rmtree(serialization_dir)