Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filtermanager: avoid blocked by the conf init #588

Merged
merged 4 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type FilterManagerConfig struct {
type filterManagerConfig struct {
consumerFiltersEndAt int

initOnce *sync.Once
initFailed bool
initFailure error
initFailedPluginName string

parsed []*model.ParsedFilterConfig
pool *sync.Pool

Expand Down Expand Up @@ -101,6 +106,10 @@ func (conf *filterManagerConfig) Merge(another *filterManagerConfig) *filterMana
// Let's copy fields manually.
cp := initFilterManagerConfig(ns)

if conf.initOnce != nil || another.initOnce != nil {
cp.initOnce = &sync.Once{}
}

cp.enableDebugMode = conf.enableDebugMode
if another.enableDebugMode {
cp.enableDebugMode = true
Expand Down Expand Up @@ -154,18 +163,26 @@ func (conf *filterManagerConfig) Merge(another *filterManagerConfig) *filterMana
}

func (conf *filterManagerConfig) InitOnce() {
for _, fc := range conf.parsed {
config := fc.ParsedConfig
if initer, ok := config.(pkgPlugins.Initer); ok {
fc.InitOnce.Do(func() {
// For now, we have nothing to provide as config callbacks
err := initer.Init(nil)
if err != nil {
fc.Factory = NewInternalErrorFactory(fc.Name, err)
if conf.initOnce == nil {
return
}

conf.initOnce.Do(func() {
for _, fc := range conf.parsed {
config := fc.ParsedConfig
if initer, ok := config.(pkgPlugins.Initer); ok {
fc.InitOnce.Do(func() {
// For now, we have nothing to provide as config callbacks
fc.InitFailure = initer.Init(nil)
})
if fc.InitFailure != nil {
conf.initFailure = fc.InitFailure
conf.initFailedPluginName = fc.Name
conf.initFailed = true
}
})
}
}
}
})
}

func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigCallbackHandler) (interface{}, error) {
Expand Down Expand Up @@ -204,6 +221,7 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC

consumerFiltersEndAt := 0
i := 0
needInit := false

for _, proto := range plugins {
name := proto.Name
Expand Down Expand Up @@ -233,6 +251,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
consumerFiltersEndAt = i + 1
}

if _, ok := config.(pkgPlugins.Initer); ok {
needInit = true
}

if name == "debugMode" {
// we handle this plugin differently, so we can have debug behavior before
// executing this plugin.
Expand All @@ -247,6 +269,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
}
conf.consumerFiltersEndAt = consumerFiltersEndAt

if needInit {
conf.initOnce = &sync.Once{}
}

return conf, nil
}

Expand Down Expand Up @@ -493,15 +519,11 @@ func FilterManagerFactory(c interface{}) capi.StreamFilterFactory {
}
}()

conf.InitOnce()

fm := conf.pool.Get().(*filterManager)
fm.callbacks.FilterCallbackHandler = cb

canSkipMethod := fm.canSkipMethod
if canSkipMethod == nil {
// the `canSkipMethod` can't be initialized in InitOnce,
// as it depends on the filter which is created per request.
canSkipMethod = newSkipMethodsMap()
}

Expand Down Expand Up @@ -565,7 +587,7 @@ func FilterManagerFactory(c interface{}) capi.StreamFilterFactory {

// The skip check is based on the compiled code. So if the DecodeRequest is defined,
// even it is not called, DecodeData will not be skipped. Same as EncodeResponse.
fm.canSkipDecodeHeaders = fm.canSkipMethod["DecodeHeaders"]
fm.canSkipDecodeHeaders = fm.canSkipMethod["DecodeHeaders"] && fm.canSkipMethod["DecodeRequest"] && fm.config.initOnce == nil
fm.canSkipDecodeData = fm.canSkipMethod["DecodeData"] && fm.canSkipMethod["DecodeRequest"]
fm.canSkipEncodeHeaders = fm.canSkipMethod["EncodeHeaders"]
fm.canSkipEncodeData = fm.canSkipMethod["EncodeData"] && fm.canSkipMethod["EncodeResponse"]
Expand Down Expand Up @@ -670,6 +692,15 @@ func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream b
defer m.callbacks.RecoverPanic()
var res api.ResultAction

m.config.InitOnce()
if m.config.initFailed {
api.LogErrorf("error in plugin %s: %s", m.config.initFailedPluginName, m.config.initFailure)
m.localReply(&api.LocalResponse{
Code: 500,
})
return
}

headers := &filterManagerRequestHeaderMap{
RequestHeaderMap: headers,
}
Expand Down
83 changes: 83 additions & 0 deletions api/pkg/filtermanager/filtermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package filtermanager

import (
"errors"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -308,6 +309,88 @@ func (f *setConsumerFilter) DecodeHeaders(headers api.RequestHeaderMap, endStrea
return api.Continue
}

func initFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter {
return &api.PassThroughFilter{}
}

type initConfig struct {
count int
err error
}

func (c *initConfig) Init(cb api.ConfigCallbackHandler) error {
c.count++
return c.err
}

func TestInitFailed(t *testing.T) {
config := initFilterManagerConfig("ns")
config.initOnce = &sync.Once{}
ok := &initConfig{}
bad := &initConfig{
err: errors.New("ouch"),
}
okParsed := &model.ParsedFilterConfig{
Name: "init",
Factory: initFactory,
ParsedConfig: ok,
}
badParsed := &model.ParsedFilterConfig{
Name: "initFailed",
Factory: initFactory,
ParsedConfig: bad,
}

config.parsed = []*model.ParsedFilterConfig{
okParsed,
badParsed,
}
n := 10
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
cb := envoy.NewCAPIFilterCallbackHandler()
m := FilterManagerFactory(config)(cb).(*filterManager)
h := http.Header{}
hdr := envoy.NewRequestHeaderMap(h)
m.DecodeHeaders(hdr, true)
cb.WaitContinued()
r := cb.LocalResponse()
assert.Equal(t, 500, r.Code)

wg.Done()
}(i)
}
wg.Wait()

assert.Equal(t, 1, ok.count)
assert.Equal(t, 1, bad.count)

config2 := initFilterManagerConfig("from_lds")
// simulate config inherited from LDS
config2 = config2.Merge(config)
wg.Add(n)
for i := 0; i < n; i++ {
go func(i int) {
cb := envoy.NewCAPIFilterCallbackHandler()
m := FilterManagerFactory(config2)(cb).(*filterManager)
h := http.Header{}
hdr := envoy.NewRequestHeaderMap(h)
m.DecodeHeaders(hdr, true)
cb.WaitContinued()
r := cb.LocalResponse()
assert.Equal(t, 500, r.Code)

wg.Done()
}(i)
}
wg.Wait()

assert.Equal(t, 1, ok.count)
assert.Equal(t, 1, bad.count)
}

func onLogFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter {
return &onLogFilter{}
}
Expand Down
1 change: 1 addition & 0 deletions api/pkg/filtermanager/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ParsedFilterConfig struct {
Name string
ParsedConfig interface{}
InitOnce sync.Once
InitFailure error
Factory api.FilterFactory
}

Expand Down