Skip to content

Commit

Permalink
filtermanager: avoid blocked by the conf init (#588)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander committed Jun 19, 2024
1 parent e1b10a0 commit cf8c130
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 15 deletions.
61 changes: 46 additions & 15 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type FilterManagerConfig struct {
type filterManagerConfig struct {
consumerFiltersEndAt int

initOnce *sync.Once
initFailed bool
initFailure error
initFailedPluginName string

parsed []*model.ParsedFilterConfig
pool *sync.Pool

Expand Down Expand Up @@ -101,6 +106,10 @@ func (conf *filterManagerConfig) Merge(another *filterManagerConfig) *filterMana
// Let's copy fields manually.
cp := initFilterManagerConfig(ns)

if conf.initOnce != nil || another.initOnce != nil {
cp.initOnce = &sync.Once{}
}

cp.enableDebugMode = conf.enableDebugMode
if another.enableDebugMode {
cp.enableDebugMode = true
Expand Down Expand Up @@ -154,18 +163,26 @@ func (conf *filterManagerConfig) Merge(another *filterManagerConfig) *filterMana
}

func (conf *filterManagerConfig) InitOnce() {
for _, fc := range conf.parsed {
config := fc.ParsedConfig
if initer, ok := config.(pkgPlugins.Initer); ok {
fc.InitOnce.Do(func() {
// For now, we have nothing to provide as config callbacks
err := initer.Init(nil)
if err != nil {
fc.Factory = NewInternalErrorFactory(fc.Name, err)
if conf.initOnce == nil {
return
}

conf.initOnce.Do(func() {
for _, fc := range conf.parsed {
config := fc.ParsedConfig
if initer, ok := config.(pkgPlugins.Initer); ok {
fc.InitOnce.Do(func() {
// For now, we have nothing to provide as config callbacks
fc.InitFailure = initer.Init(nil)
})
if fc.InitFailure != nil {
conf.initFailure = fc.InitFailure
conf.initFailedPluginName = fc.Name
conf.initFailed = true
}
})
}
}
}
})
}

func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigCallbackHandler) (interface{}, error) {
Expand Down Expand Up @@ -204,6 +221,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC

consumerFiltersEndAt := 0
i := 0
needInit := false

for _, proto := range plugins {
name := proto.Name
Expand Down Expand Up @@ -233,6 +251,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
consumerFiltersEndAt = i + 1
}

if _, ok := config.(pkgPlugins.Initer); ok {
needInit = true
}

if name == "debugMode" {
// we handle this plugin differently, so we can have debug behavior before
// executing this plugin.
Expand All @@ -247,6 +269,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
}
conf.consumerFiltersEndAt = consumerFiltersEndAt

if needInit {
conf.initOnce = &sync.Once{}
}

return conf, nil
}

Expand Down Expand Up @@ -493,15 +519,11 @@ func FilterManagerFactory(c interface{}) capi.StreamFilterFactory {
}
}()

conf.InitOnce()

fm := conf.pool.Get().(*filterManager)
fm.callbacks.FilterCallbackHandler = cb

canSkipMethod := fm.canSkipMethod
if canSkipMethod == nil {
// the `canSkipMethod` can't be initialized in InitOnce,
// as it depends on the filter which is created per request.
canSkipMethod = newSkipMethodsMap()
}

Expand Down Expand Up @@ -565,7 +587,7 @@ func FilterManagerFactory(c interface{}) capi.StreamFilterFactory {

// The skip check is based on the compiled code. So if the DecodeRequest is defined,
// even it is not called, DecodeData will not be skipped. Same as EncodeResponse.
fm.canSkipDecodeHeaders = fm.canSkipMethod["DecodeHeaders"]
fm.canSkipDecodeHeaders = fm.canSkipMethod["DecodeHeaders"] && fm.canSkipMethod["DecodeRequest"] && fm.config.initOnce == nil
fm.canSkipDecodeData = fm.canSkipMethod["DecodeData"] && fm.canSkipMethod["DecodeRequest"]
fm.canSkipEncodeHeaders = fm.canSkipMethod["EncodeHeaders"]
fm.canSkipEncodeData = fm.canSkipMethod["EncodeData"] && fm.canSkipMethod["EncodeResponse"]
Expand Down Expand Up @@ -670,6 +692,15 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b
defer m.callbacks.RecoverPanic()
var res api.ResultAction

m.config.InitOnce()
if m.config.initFailed {
api.LogErrorf("error in plugin %s: %s", m.config.initFailedPluginName, m.config.initFailure)
m.localReply(&api.LocalResponse{
Code: 500,
})
return
}

headers := &filterManagerRequestHeaderMap{
RequestHeaderMap: headers,
}
Expand Down
83 changes: 83 additions & 0 deletions api/pkg/filtermanager/filtermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package filtermanager

import (
"errors"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -308,6 +309,88 @@ func (f *setConsumerFilter) DecodeHeaders(headers api.RequestHeaderMap, endStrea
return api.Continue
}

func initFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter {
return &api.PassThroughFilter{}
}

type initConfig struct {
count int
err error
}

func (c *initConfig) Init(cb api.ConfigCallbackHandler) error {
c.count++
return c.err
}

func TestInitFailed(t *testing.T) {
config := initFilterManagerConfig("ns")
config.initOnce = &sync.Once{}
ok := &initConfig{}
bad := &initConfig{
err: errors.New("ouch"),
}
okParsed := &model.ParsedFilterConfig{
Name: "init",
Factory: initFactory,
ParsedConfig: ok,
}
badParsed := &model.ParsedFilterConfig{
Name: "initFailed",
Factory: initFactory,
ParsedConfig: bad,
}

config.parsed = []*model.ParsedFilterConfig{
okParsed,
badParsed,
}
n := 10
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
cb := envoy.NewCAPIFilterCallbackHandler()
m := FilterManagerFactory(config)(cb).(*filterManager)
h := http.Header{}
hdr := envoy.NewRequestHeaderMap(h)
m.DecodeHeaders(hdr, true)
cb.WaitContinued()
r := cb.LocalResponse()
assert.Equal(t, 500, r.Code)

wg.Done()
}(i)
}
wg.Wait()

assert.Equal(t, 1, ok.count)
assert.Equal(t, 1, bad.count)

config2 := initFilterManagerConfig("from_lds")
// simulate config inherited from LDS
config2 = config2.Merge(config)
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
cb := envoy.NewCAPIFilterCallbackHandler()
m := FilterManagerFactory(config2)(cb).(*filterManager)
h := http.Header{}
hdr := envoy.NewRequestHeaderMap(h)
m.DecodeHeaders(hdr, true)
cb.WaitContinued()
r := cb.LocalResponse()
assert.Equal(t, 500, r.Code)

wg.Done()
}(i)
}
wg.Wait()

assert.Equal(t, 1, ok.count)
assert.Equal(t, 1, bad.count)
}

func onLogFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter {
return &onLogFilter{}
}
Expand Down
1 change: 1 addition & 0 deletions api/pkg/filtermanager/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ParsedFilterConfig struct {
Name string
ParsedConfig interface{}
InitOnce sync.Once
InitFailure error
Factory api.FilterFactory
}

Expand Down

0 comments on commit cf8c130

Please sign in to comment.