Skip to content

Commit 207e057

Browse files
committedDec 2, 2024
fix(metrics): Fix model label metric in case of experiment (#6118)
* use internal model header to compute model label * add test for rest * add test for grpc rproxy * adjust log message
1 parent 3d9cb60 commit 207e057

File tree

4 files changed

+112
-44
lines changed

4 files changed

+112
-44
lines changed
 

‎scheduler/pkg/agent/rproxy.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,15 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e
7676
var originalBody []byte
7777
var err error
7878

79-
externalModelName := req.Header.Get(resources.SeldonModelHeader)
8079
internalModelName := req.Header.Get(resources.SeldonInternalModelHeader)
80+
// externalModelName is the name of the model as it is known to the client, we should not use
81+
// resources.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy)
82+
// however for the metrics we need the actual model name and this is done by using resources.SeldonInternalModelHeader
83+
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
84+
if err != nil {
85+
t.logger.WithError(err).Warnf("cannot extract model name from %s, revert to actual header", internalModelName)
86+
externalModelName = req.Header.Get(resources.SeldonModelHeader)
87+
}
8188

8289
// to sync between scalingMetricsSetup and scalingMetricsTearDown calls running in go routines
8390
var wg sync.WaitGroup
@@ -139,8 +146,15 @@ func (rp *reverseHTTPProxy) addHandlers(proxy http.Handler) http.Handler {
139146
rp.logger.Debugf("Received request with host %s and internal header %v", r.Host, r.Header.Values(resources.SeldonInternalModelHeader))
140147
rewriteHostHandler(r)
141148

142-
externalModelName := r.Header.Get(resources.SeldonModelHeader)
143149
internalModelName := r.Header.Get(resources.SeldonInternalModelHeader)
150+
// externalModelName is the name of the model as it is known to the client, we should not use
151+
// resources.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy)
152+
// however for the metrics we need the actual model name and this is done by using resources.SeldonInternalModelHeader
153+
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
154+
if err != nil {
155+
rp.logger.WithError(err).Warnf("cannot extract model name from %s, revert to actual header", internalModelName)
156+
externalModelName = r.Header.Get(resources.SeldonModelHeader)
157+
}
144158

145159
//TODO should we return a 404 if headers not found?
146160
if externalModelName == "" || internalModelName == "" {

‎scheduler/pkg/agent/rproxy_grpc.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,10 @@ func extractModelNamesFromHeaders(ctx context.Context) (string, string, bool) {
355355
md, ok := metadata.FromIncomingContext(ctx)
356356
if ok {
357357
internalModelName := extractHeader(resources.SeldonInternalModelHeader, md)
358-
externalModelName := extractHeader(resources.SeldonModelHeader, md)
358+
externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName)
359+
if err != nil {
360+
externalModelName = extractHeader(resources.SeldonModelHeader, md)
361+
}
359362
return internalModelName, externalModelName, internalModelName != "" && externalModelName != ""
360363
}
361364
return "", "", false

‎scheduler/pkg/agent/rproxy_grpc_test.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ import (
2828
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling"
2929
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
3030
testing_utils2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils"
31+
"github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics"
3132
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
3233
)
3334

34-
func setupReverseGRPCService(numModels int, modelPrefix string, backEndGRPCPort, rpPort, backEndServerPort int) *reverseGRPCProxy {
35+
func setupReverseGRPCService(numModels int, modelPrefix string, backEndGRPCPort, rpPort, backEndServerPort int, metricsHandler metrics.AgentMetricsHandler) *reverseGRPCProxy {
3536
logger := log.New()
3637
log.SetLevel(log.DebugLevel)
3738

@@ -41,7 +42,7 @@ func setupReverseGRPCService(numModels int, modelPrefix string, backEndGRPCPort,
4142
modelscaling.NewModelReplicaLagsKeeper(), modelscaling.NewModelReplicaLastUsedKeeper(),
4243
)
4344
rp := NewReverseGRPCProxy(
44-
newFakeMetricsHandler(),
45+
metricsHandler,
4546
logger,
4647
"localhost",
4748
uint(backEndGRPCPort),
@@ -95,7 +96,8 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {
9596
if err != nil {
9697
t.Fatal(err)
9798
}
98-
rpGRPC := setupReverseGRPCService(10, dummyModelNamePrefix, backEndGRPCPort, rpPort, serverPort)
99+
fakeMetricsHandler := newFakeMetricsHandler()
100+
rpGRPC := setupReverseGRPCService(10, dummyModelNamePrefix, backEndGRPCPort, rpPort, serverPort, fakeMetricsHandler)
99101
_ = rpGRPC.Start()
100102

101103
t.Log("Testing model found")
@@ -115,28 +117,28 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {
115117
}
116118
defer conn.Close()
117119

118-
doInfer := func(modelSuffix string) (*v2.ModelInferResponse, error) {
120+
doInfer := func(modelSuffixInternal, modelSuffix string) (*v2.ModelInferResponse, error) {
119121
client := v2.NewGRPCInferenceServiceClient(conn)
120122
ctx := context.Background()
121-
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
123+
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffixInternal, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
122124
return client.ModelInfer(ctx, &v2.ModelInferRequest{ModelName: dummyModelNamePrefix}) // note without suffix
123125
}
124126

125127
doMeta := func(modelSuffix string) (*v2.ModelMetadataResponse, error) {
126128
client := v2.NewGRPCInferenceServiceClient(conn)
127129
ctx := context.Background()
128-
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
130+
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix)
129131
return client.ModelMetadata(ctx, &v2.ModelMetadataRequest{Name: dummyModelNamePrefix}) // note without suffix
130132
}
131133

132134
doModelReady := func(modelSuffix string) (*v2.ModelReadyResponse, error) {
133135
client := v2.NewGRPCInferenceServiceClient(conn)
134136
ctx := context.Background()
135-
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix)
137+
ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix)
136138
return client.ModelReady(ctx, &v2.ModelReadyRequest{Name: dummyModelNamePrefix}) // note without suffix
137139
}
138140

139-
responseInfer, errInfer := doInfer("_0")
141+
responseInfer, errInfer := doInfer("_0", ".experiment")
140142
g.Expect(errInfer).To(BeNil())
141143
g.Expect(responseInfer.ModelName).To(Equal(dummyModelNamePrefix + "_0"))
142144
g.Expect(responseInfer.ModelVersion).To(Equal("")) // in practice this should be something else
@@ -145,6 +147,11 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {
145147
g.Expect(rpGRPC.modelScalingStatsCollector.ModelLagStats.Get(dummyModelNamePrefix + "_0")).To(Equal(uint32(0)))
146148
g.Expect(rpGRPC.modelScalingStatsCollector.ModelLastUsedStats.Get(dummyModelNamePrefix + "_0")).Should(BeNumerically("<=", time.Now().Unix())) // only triggered when we get results back
147149

150+
t.Log("Testing model infer metrics")
151+
g.Expect(fakeMetricsHandler.modelInferState[dummyModelNamePrefix].internalModelName).To(Equal(dummyModelNamePrefix + "_0"))
152+
g.Expect(fakeMetricsHandler.modelInferState[dummyModelNamePrefix].method).To(Equal("grpc"))
153+
g.Expect(fakeMetricsHandler.modelInferState[dummyModelNamePrefix].code).To(Equal("OK")) // note it is not 200 for grpc, should we change this?
154+
148155
responseMeta, errMeta := doMeta("_0")
149156
g.Expect(responseMeta.Name).To(Equal(dummyModelNamePrefix + "_0"))
150157
g.Expect(responseMeta.Versions).To(Equal([]string{""})) // in practice this should be something else
@@ -157,13 +164,13 @@ func TestReverseGRPCServiceSmoke(t *testing.T) {
157164

158165
t.Log("Testing lazy load")
159166
mockMLServerState.setModelServerUnloaded(dummyModelNamePrefix + "_0")
160-
responseInfer, errInfer = doInfer("_0")
167+
responseInfer, errInfer = doInfer("_0", "")
161168
g.Expect(errInfer).To(BeNil())
162169
g.Expect(responseInfer.ModelName).To(Equal(dummyModelNamePrefix + "_0"))
163170
g.Expect(responseInfer.ModelVersion).To(Equal("")) // in practice this should be something else
164171

165172
t.Log("Testing model not found")
166-
_, errInfer = doInfer("_1")
173+
_, errInfer = doInfer("_1", "")
167174
g.Expect(errInfer).NotTo(BeNil())
168175
g.Expect(mockMLServerState.isModelLoaded(dummyModelNamePrefix + "_1")).To(Equal(false))
169176

@@ -184,7 +191,8 @@ func TestReverseGRPCServiceEarlyStop(t *testing.T) {
184191

185192
dummyModelNamePrefix := "dummy_model"
186193

187-
rpGRPC := setupReverseGRPCService(0, dummyModelNamePrefix, 1, 1, 1)
194+
fakeMetricsHandler := newFakeMetricsHandler()
195+
rpGRPC := setupReverseGRPCService(0, dummyModelNamePrefix, 1, 1, 1, fakeMetricsHandler)
188196
err := rpGRPC.Stop()
189197
g.Expect(err).To(BeNil())
190198
ready := rpGRPC.Ready()

‎scheduler/pkg/agent/rproxy_test.go

+73-30
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling"
3232
"github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources"
3333
testing_utils2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils"
34+
"github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics"
3435
"github.com/seldonio/seldon-core/scheduler/v2/pkg/util"
3536
)
3637

@@ -98,9 +99,16 @@ type loadModelSateValue struct {
9899
isSoft bool
99100
}
100101

102+
type inferModelSateValue struct {
103+
internalModelName string
104+
method string
105+
code string
106+
}
107+
101108
type fakeMetricsHandler struct {
102-
modelLoadState map[string]loadModelSateValue
103-
mu *sync.Mutex
109+
modelLoadState map[string]loadModelSateValue
110+
modelInferState map[string]inferModelSateValue
111+
mu *sync.Mutex
104112
}
105113

106114
func (f fakeMetricsHandler) AddModelHistogramMetricsHandler(baseHandler http.HandlerFunc) http.HandlerFunc {
@@ -112,6 +120,14 @@ func (f fakeMetricsHandler) HttpCodeToString(code int) string {
112120
}
113121

114122
func (f fakeMetricsHandler) AddModelInferMetrics(externalModelName string, internalModelName string, method string, elapsedTime float64, code string) {
123+
f.mu.Lock()
124+
defer f.mu.Unlock()
125+
126+
f.modelInferState[externalModelName] = inferModelSateValue{
127+
internalModelName: internalModelName,
128+
method: method,
129+
code: code,
130+
}
115131
}
116132

117133
func (f fakeMetricsHandler) AddLoadedModelMetrics(internalModelName string, memory uint64, isLoad, isSoft bool) {
@@ -130,8 +146,9 @@ func (f fakeMetricsHandler) AddServerReplicaMetrics(memory uint64, memoryWithOve
130146

131147
func newFakeMetricsHandler() fakeMetricsHandler {
132148
return fakeMetricsHandler{
133-
modelLoadState: map[string]loadModelSateValue{},
134-
mu: &sync.Mutex{},
149+
modelLoadState: map[string]loadModelSateValue{},
150+
modelInferState: map[string]inferModelSateValue{},
151+
mu: &sync.Mutex{},
135152
}
136153
}
137154

@@ -141,7 +158,7 @@ func (f fakeMetricsHandler) UnaryServerInterceptor() func(ctx context.Context, r
141158
}
142159
}
143160

144-
func setupReverseProxy(logger log.FieldLogger, numModels int, modelPrefix string, rpPort, serverPort int) *reverseHTTPProxy {
161+
func setupReverseProxy(logger log.FieldLogger, numModels int, modelPrefix string, rpPort, serverPort int, metricsHandler metrics.AgentMetricsHandler) *reverseHTTPProxy {
145162
v2Client := testing_utils.NewV2RestClientForTest("localhost", serverPort, logger)
146163
localCacheManager := setupLocalTestManager(numModels, modelPrefix, v2Client, numModels-2, 1)
147164
modelScalingStatsCollector := modelscaling.NewDataPlaneStatsCollector(
@@ -153,7 +170,7 @@ func setupReverseProxy(logger log.FieldLogger, numModels int, modelPrefix string
153170
"localhost",
154171
uint(serverPort),
155172
uint(rpPort),
156-
fakeMetricsHandler{},
173+
metricsHandler,
157174
modelScalingStatsCollector,
158175
)
159176
rp.SetState(localCacheManager)
@@ -166,34 +183,51 @@ func TestReverseProxySmoke(t *testing.T) {
166183
logger.SetLevel(log.DebugLevel)
167184

168185
type test struct {
169-
name string
170-
modelToLoad string
171-
modelToRequest string
172-
statusCode int
173-
isLoadedonServer bool
186+
name string
187+
modelToLoad string
188+
modelToRequest string
189+
modelExternalHeader string
190+
expectedModelExternalTag string
191+
statusCode int
192+
isLoadedonServer bool
174193
}
175194

176195
tests := []test{
177196
{
178-
name: "model exists",
179-
modelToLoad: "foo",
180-
modelToRequest: "foo",
181-
statusCode: http.StatusOK,
182-
isLoadedonServer: true,
197+
name: "model exists",
198+
modelToLoad: "foo_1",
199+
modelToRequest: "foo_1",
200+
modelExternalHeader: "foo",
201+
expectedModelExternalTag: "foo",
202+
statusCode: http.StatusOK,
203+
isLoadedonServer: true,
183204
},
184205
{
185-
name: "model exists on agent but not loaded on server",
186-
modelToLoad: "foo",
187-
modelToRequest: "foo",
188-
statusCode: http.StatusOK,
189-
isLoadedonServer: false,
206+
name: "model exists, part of experiment",
207+
modelToLoad: "foo_1",
208+
modelToRequest: "foo_1",
209+
modelExternalHeader: "foo-experiment.experiment",
210+
expectedModelExternalTag: "foo",
211+
statusCode: http.StatusOK,
212+
isLoadedonServer: true,
190213
},
191214
{
192-
name: "model does not exists",
193-
modelToLoad: "foo",
194-
modelToRequest: "foo2",
195-
statusCode: http.StatusNotFound,
196-
isLoadedonServer: false,
215+
name: "model exists on agent but not loaded on server",
216+
modelToLoad: "foo_1",
217+
modelToRequest: "foo_1",
218+
modelExternalHeader: "foo",
219+
expectedModelExternalTag: "foo",
220+
statusCode: http.StatusOK,
221+
isLoadedonServer: false,
222+
},
223+
{
224+
name: "model does not exists",
225+
modelToLoad: "foo_1",
226+
modelToRequest: "foo2_1",
227+
modelExternalHeader: "foo2",
228+
expectedModelExternalTag: "foo2",
229+
statusCode: http.StatusNotFound,
230+
isLoadedonServer: false,
197231
},
198232
}
199233

@@ -217,7 +251,8 @@ func TestReverseProxySmoke(t *testing.T) {
217251
if err != nil {
218252
t.Fatal(err)
219253
}
220-
rpHTTP := setupReverseProxy(logger, 3, test.modelToLoad, rpPort, serverPort)
254+
fakeMetricsHandler := newFakeMetricsHandler()
255+
rpHTTP := setupReverseProxy(logger, 3, test.modelToLoad, rpPort, serverPort, fakeMetricsHandler)
221256
err = rpHTTP.Start()
222257
g.Expect(err).To(BeNil())
223258
time.Sleep(500 * time.Millisecond)
@@ -237,7 +272,7 @@ func TestReverseProxySmoke(t *testing.T) {
237272
req, err := http.NewRequest(http.MethodPost, url, nil)
238273
g.Expect(err).To(BeNil())
239274
req.Header.Set("contentType", "application/json")
240-
req.Header.Set(resources.SeldonModelHeader, test.modelToRequest)
275+
req.Header.Set(resources.SeldonModelHeader, test.modelExternalHeader)
241276
req.Header.Set(resources.SeldonInternalModelHeader, test.modelToRequest)
242277
resp, err := http.DefaultClient.Do(req)
243278
g.Expect(err).To(BeNil())
@@ -254,9 +289,16 @@ func TestReverseProxySmoke(t *testing.T) {
254289
if test.statusCode == http.StatusOK {
255290
g.Expect(rpHTTP.modelScalingStatsCollector.ModelLagStats.Get(test.modelToRequest)).To(Equal(uint32(0)))
256291
g.Expect(rpHTTP.modelScalingStatsCollector.ModelLastUsedStats.Get(test.modelToRequest)).Should(BeNumerically("<=", time.Now().Unix())) // only triggered when we get results back
257-
258292
}
259293

294+
// test infer metrics
295+
g.Expect(fakeMetricsHandler.modelInferState[test.expectedModelExternalTag].internalModelName).To(Equal(test.modelToRequest))
296+
g.Expect(fakeMetricsHandler.modelInferState[test.expectedModelExternalTag].method).To(Equal("rest"))
297+
if test.statusCode == http.StatusOK {
298+
g.Expect(fakeMetricsHandler.modelInferState[test.expectedModelExternalTag].code).To(Equal("200"))
299+
} else {
300+
g.Expect(fakeMetricsHandler.modelInferState[test.expectedModelExternalTag].code).To(Equal("404"))
301+
}
260302
g.Expect(rpHTTP.Ready()).To(BeTrue())
261303
_ = rpHTTP.Stop()
262304
g.Expect(rpHTTP.Ready()).To(BeFalse())
@@ -273,7 +315,8 @@ func TestReverseEarlyStop(t *testing.T) {
273315
logger := log.New()
274316
logger.SetLevel(log.DebugLevel)
275317

276-
rpHTTP := setupReverseProxy(logger, 0, "dummy", 1, 1)
318+
fakeMetricsHandler := newFakeMetricsHandler()
319+
rpHTTP := setupReverseProxy(logger, 0, "dummy", 1, 1, fakeMetricsHandler)
277320
err := rpHTTP.Stop()
278321
g.Expect(err).To(BeNil())
279322
ready := rpHTTP.Ready()

0 commit comments

Comments
 (0)
Failed to load comments.