Skip to content

Commit

Permalink
Reduce allocations to store config on context in Activator (#11013)
Browse files Browse the repository at this point in the history
* Reduce allocations to store config on context in Activator

The current store implementation has 5 allocations in its HTTP Middleware

- 2 in the call for `r.WithContext(ctx)`
- 1 to allocate the new value context
- 1 to allocate a new *Config
- 1 to load the untyped config

This reduces the footprint of the store to just one allocation by

1. Storing the intermediate Config until it needs to be updated.
2. Collapsing `r.WithContext(ctx)` calls with the context handler.

* Fix optical illusion
  • Loading branch information
markusthoemmes committed Mar 24, 2021
1 parent 1a0d326 commit d5d489c
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 51 deletions.
3 changes: 1 addition & 2 deletions cmd/activator/main.go
Expand Up @@ -178,7 +178,6 @@ func main() {
ah := activatorhandler.New(ctx, throttler, transport, logger)
ah = concurrencyReporter.Handler(ah)
ah = tracing.HTTPSpanMiddleware(ah)
ah = configStore.HTTPMiddleware(ah)
reqLogHandler, err := pkghttp.NewRequestLogHandler(ah, logging.NewSyncFileWriter(os.Stdout), "",
requestLogTemplateInputGetter, false /*enableProbeRequestLog*/)
if err != nil {
Expand All @@ -189,7 +188,7 @@ func main() {
// NOTE: MetricHandler is being used as the outermost handler of the meaty bits. We're not interested in measuring
// the healthchecks or probes.
ah = activatorhandler.NewMetricHandler(env.PodName, ah)
ah = activatorhandler.NewContextHandler(ctx, ah)
ah = activatorhandler.NewContextHandler(ctx, ah, configStore)

// Network probe handlers.
ah = &activatorhandler.ProbeHandler{NextHandler: ah}
Expand Down
65 changes: 23 additions & 42 deletions pkg/activator/config/store.go
Expand Up @@ -18,8 +18,8 @@ package config

import (
"context"
"net/http"

"go.uber.org/atomic"
"knative.dev/pkg/configmap"
tracingconfig "knative.dev/pkg/tracing/config"
)
Expand All @@ -36,57 +36,38 @@ func FromContext(ctx context.Context) *Config {
return ctx.Value(cfgKey{}).(*Config)
}

func toContext(ctx context.Context, c *Config) context.Context {
return context.WithValue(ctx, cfgKey{}, c)
}

// Store loads/unloads our untyped configuration.
// +k8s:deepcopy-gen=false
type Store struct {
*configmap.UntypedStore

// current is the current Config.
current atomic.Value
}

// NewStore creates a new configuration Store.
func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value interface{})) *Store {
return &Store{
UntypedStore: configmap.NewUntypedStore(
"activator",
logger,
configmap.Constructors{
tracingconfig.ConfigName: tracingconfig.NewTracingConfigFromConfigMap,
},
onAfterStore...,
),
}
s := &Store{}

// Append an update function to run after a ConfigMap has updated to update the
// current state of the Config.
onAfterStore = append(onAfterStore, func(_ string, _ interface{}) {
s.current.Store(&Config{
Tracing: s.UntypedLoad(tracingconfig.ConfigName).(*tracingconfig.Config).DeepCopy(),
})
})
s.UntypedStore = configmap.NewUntypedStore(
"activator",
logger,
configmap.Constructors{
tracingconfig.ConfigName: tracingconfig.NewTracingConfigFromConfigMap,
},
onAfterStore...,
)
return s
}

// ToContext stores the configuration Store in the passed context.
func (s *Store) ToContext(ctx context.Context) context.Context {
return toContext(ctx, s.Load())
}

// Load creates a Config for this store.
func (s *Store) Load() *Config {
return &Config{
Tracing: s.UntypedLoad(tracingconfig.ConfigName).(*tracingconfig.Config).DeepCopy(),
}
}

type storeMiddleware struct {
store *Store
next http.Handler
}

// ServeHTTP injects Config in to the context of the http request r.
func (mw *storeMiddleware) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := mw.store.ToContext(r.Context())
mw.next.ServeHTTP(w, r.WithContext(ctx))
}

// HTTPMiddleware is a middleware which stores the current config store in the request context.
func (s *Store) HTTPMiddleware(next http.Handler) http.Handler {
return &storeMiddleware{
store: s,
next: next,
}
return context.WithValue(ctx, cfgKey{}, s.current.Load())
}
89 changes: 89 additions & 0 deletions pkg/activator/config/store_test.go
@@ -0,0 +1,89 @@
/*
Copyright 2021 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package config

import (
"context"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ltesting "knative.dev/pkg/logging/testing"
tracingconfig "knative.dev/pkg/tracing/config"
)

var tracingConfig = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "knative-serving",
Name: "config-tracing",
},
Data: map[string]string{
"backend": "none",
},
}

func TestStore(t *testing.T) {
logger := ltesting.TestLogger(t)
store := NewStore(logger)
store.OnConfigChanged(tracingConfig)

ctx := store.ToContext(context.Background())
cfg := FromContext(ctx)

if got, want := cfg.Tracing.Backend, tracingconfig.None; got != want {
t.Fatalf("Tracing.Backend = %v, want %v", got, want)
}

newConfig := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: "knative-serving",
Name: "config-tracing",
},
Data: map[string]string{
"backend": "zipkin",
"zipkin-endpoint": "foo.bar",
},
}
store.OnConfigChanged(newConfig)

ctx = store.ToContext(context.Background())
cfg = FromContext(ctx)

if got, want := cfg.Tracing.Backend, tracingconfig.Zipkin; got != want {
t.Fatalf("Tracing.Backend = %v, want %v", got, want)
}
}

func BenchmarkStoreToContext(b *testing.B) {
logger := ltesting.TestLogger(b)
store := NewStore(logger)
store.OnConfigChanged(tracingConfig)

b.Run("sequential", func(b *testing.B) {
for j := 0; j < b.N; j++ {
store.ToContext(context.Background())
}
})

b.Run("parallel", func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
store.ToContext(context.Background())
}
})
})
}
7 changes: 5 additions & 2 deletions pkg/activator/handler/context_handler.go
Expand Up @@ -30,17 +30,19 @@ import (
"knative.dev/pkg/logging/logkey"
network "knative.dev/pkg/network"
"knative.dev/serving/pkg/activator"
activatorconfig "knative.dev/serving/pkg/activator/config"
revisioninformer "knative.dev/serving/pkg/client/injection/informers/serving/v1/revision"
servinglisters "knative.dev/serving/pkg/client/listers/serving/v1"
)

// NewContextHandler creates a handler that extracts the necessary context from the request
// and makes it available on the request's context.
func NewContextHandler(ctx context.Context, next http.Handler) http.Handler {
func NewContextHandler(ctx context.Context, next http.Handler, store *activatorconfig.Store) http.Handler {
return &contextHandler{
nextHandler: next,
revisionLister: revisioninformer.Get(ctx).Lister(),
logger: logging.FromContext(ctx),
store: store,
}
}

Expand All @@ -49,6 +51,7 @@ type contextHandler struct {
revisionLister servinglisters.RevisionLister
logger *zap.SugaredLogger
nextHandler http.Handler
store *activatorconfig.Store
}

func (h *contextHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -75,7 +78,7 @@ func (h *contextHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

ctx := r.Context()
ctx = WithRevisionAndID(ctx, revision, revID)

ctx = h.store.ToContext(ctx)
h.nextHandler.ServeHTTP(w, r.WithContext(ctx))
}

Expand Down
10 changes: 7 additions & 3 deletions pkg/activator/handler/context_handler_test.go
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/apimachinery/pkg/types"

"knative.dev/pkg/logging"
network "knative.dev/pkg/network"
rtesting "knative.dev/pkg/reconciler/testing"
"knative.dev/serving/pkg/activator"
Expand All @@ -36,6 +37,7 @@ func TestContextHandler(t *testing.T) {
revID := types.NamespacedName{Namespace: testNamespace, Name: testRevName}
revision := revision(revID.Namespace, revID.Name)
revisionInformer(ctx, revision)
configStore := setupConfigStore(t, logging.FromContext(ctx))

baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if got := RevisionFrom(r.Context()); got != revision {
Expand All @@ -47,7 +49,7 @@ func TestContextHandler(t *testing.T) {
}
})

handler := NewContextHandler(ctx, baseHandler)
handler := NewContextHandler(ctx, baseHandler, configStore)

t.Run("with headers", func(t *testing.T) {
resp := httptest.NewRecorder()
Expand Down Expand Up @@ -78,10 +80,11 @@ func TestContextHandlerError(t *testing.T) {
revID := types.NamespacedName{Namespace: testNamespace, Name: testRevName}
revision := revision(revID.Namespace, revID.Name)
revisionInformer(ctx, revision)
configStore := setupConfigStore(t, logging.FromContext(ctx))

baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})

handler := NewContextHandler(ctx, baseHandler)
handler := NewContextHandler(ctx, baseHandler, configStore)
resp := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", bytes.NewBufferString(""))
req.Header.Set(activator.RevisionHeaderNamespace, "foospace")
Expand Down Expand Up @@ -112,10 +115,11 @@ func BenchmarkContextHandler(b *testing.B) {
defer cancel()
revision := revision(testNamespace, testRevName)
revisionInformer(ctx, revision)
configStore := setupConfigStore(&testing.T{}, logging.FromContext(ctx))

baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})

handler := NewContextHandler(ctx, baseHandler)
handler := NewContextHandler(ctx, baseHandler, configStore)
req := httptest.NewRequest(http.MethodGet, "http://example.com", nil)
req.Header.Set(activator.RevisionHeaderNamespace, testNamespace)

Expand Down
3 changes: 1 addition & 2 deletions pkg/activator/handler/main_test.go
Expand Up @@ -73,10 +73,9 @@ func BenchmarkHandlerChain(b *testing.B) {
ah := New(ctx, fakeThrottler{}, rt, logger)
ah = concurrencyReporter.Handler(ah)
ah = tracing.HTTPSpanMiddleware(ah)
ah = configStore.HTTPMiddleware(ah)
ah, _ = pkghttp.NewRequestLogHandler(ah, ioutil.Discard, "", nil, false)
ah = NewMetricHandler(activatorPodName, ah)
ah = NewContextHandler(ctx, ah)
ah = NewContextHandler(ctx, ah, configStore)
ah = &ProbeHandler{NextHandler: ah}
ah = network.NewProbeHandler(ah)
ah = &HealthHandler{HealthCheck: func() error { return nil }, NextHandler: ah, Logger: logger}
Expand Down

0 comments on commit d5d489c

Please sign in to comment.