Skip to content

Commit

Permalink
fix multi accesslogging not working (#39521)
Browse files Browse the repository at this point in the history
* fix multi accesslogging not working

* fix lint

* fix e2e

* fix UT

* add more UT
  • Loading branch information
zirain committed Aug 11, 2022
1 parent 9c5f437 commit 9acc01d
Show file tree
Hide file tree
Showing 5 changed files with 634 additions and 381 deletions.
148 changes: 95 additions & 53 deletions pilot/pkg/model/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Telemetries struct {
// creation, we will preserve the Telemetries (and thus the cache) if not Telemetries are modified.
// As result, this cache will live until any Telemetry is modified.
computedMetricsFilters map[metricsKey]any
computedLoggingConfig map[loggingKey]LoggingConfig
computedLoggingConfig map[loggingKey][]LoggingConfig
mu sync.Mutex
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func getTelemetries(env *Environment) (*Telemetries, error) {
RootNamespace: env.Mesh().GetRootNamespace(),
meshConfig: env.Mesh(),
computedMetricsFilters: map[metricsKey]any{},
computedLoggingConfig: map[loggingKey]LoggingConfig{},
computedLoggingConfig: map[loggingKey][]LoggingConfig{},
}

fromEnv, err := env.List(collections.IstioTelemetryV1Alpha1Telemetries.Resource().GroupVersionKind(), NamespaceAll)
Expand Down Expand Up @@ -173,10 +173,18 @@ type tagOverride struct {
type computedTelemetries struct {
telemetryKey
Metrics []*tpb.Metrics
Logging []*tpb.AccessLogging
Logging []*computedAccessLogging
Tracing []*tpb.Tracing
}

// computedAccessLogging contains the various AccessLogging configurations in scope for a given proxy,
// include combined configurations for one of the following levels: 1. the root namespace level
// 2. namespace level 3. workload level combined.
type computedAccessLogging struct {
telemetryKey
Logging []*tpb.AccessLogging
}

type TracingConfig struct {
ServerSpec TracingSpec
ClientSpec TracingSpec
Expand All @@ -191,8 +199,8 @@ type TracingSpec struct {
}

type LoggingConfig struct {
Logs []*accesslog.AccessLog
Providers []*meshconfig.MeshConfig_ExtensionProvider
AccessLog *accesslog.AccessLog
Provider *meshconfig.MeshConfig_ExtensionProvider
Filter *tpb.AccessLogging_Filter
}

Expand All @@ -205,7 +213,7 @@ func workloadMode(class networking.ListenerClass) tpb.WorkloadMode {
case networking.ListenerClassSidecarOutbound:
return tpb.WorkloadMode_CLIENT
case networking.ListenerClassUndefined:
// this should not happened, just in case
// this should not happen, just in case
return tpb.WorkloadMode_CLIENT
}

Expand All @@ -215,9 +223,10 @@ func workloadMode(class networking.ListenerClass) tpb.WorkloadMode {
// AccessLogging returns the logging configuration for a given proxy and listener class.
// If nil is returned, access logs are not configured via Telemetry and should use fallback mechanisms.
// If a non-nil but empty configuration is passed, access logging is explicitly disabled.
func (t *Telemetries) AccessLogging(push *PushContext, proxy *Proxy, class networking.ListenerClass) *LoggingConfig {
func (t *Telemetries) AccessLogging(push *PushContext, proxy *Proxy, class networking.ListenerClass) []LoggingConfig {
ct := t.applicableTelemetries(proxy)
if len(ct.Logging) == 0 && len(t.meshConfig.GetDefaultProviders().GetAccessLogging()) == 0 {
// No Telemetry API configured, fall back to legacy mesh config setting
return nil
}

Expand All @@ -229,28 +238,33 @@ func (t *Telemetries) AccessLogging(push *PushContext, proxy *Proxy, class netwo
defer t.mu.Unlock()
precomputed, ok := t.computedLoggingConfig[key]
if ok {
return &precomputed
return precomputed
}

cfg := LoggingConfig{}
providers, f := mergeLogs(ct.Logging, t.meshConfig, workloadMode(class))
cfg.Filter = f
for _, p := range providers.SortedList() {
providers := mergeLogs(ct.Logging, t.meshConfig, workloadMode(class))
cfgs := make([]LoggingConfig, 0, len(providers))
for p, f := range providers {
fp := t.fetchProvider(p)
if fp == nil {
log.Debugf("fail to fetch provider %s", p)
continue
}
cfg := LoggingConfig{
Provider: fp,
Filter: f,
}

cfg.Providers = append(cfg.Providers, fp)
al := telemetryAccessLog(push, fp)
if al == nil {
// stackdriver will be handled in HTTPFilters/TCPFilters
continue
}
cfg.Logs = append(cfg.Logs, al)
cfg.AccessLog = al
cfgs = append(cfgs, cfg)
}

t.computedLoggingConfig[key] = cfg
return &cfg
t.computedLoggingConfig[key] = cfgs
return cfgs
}

// Tracing returns the logging tracing for a given proxy. If nil is returned, tracing
Expand Down Expand Up @@ -366,15 +380,20 @@ func (t *Telemetries) applicableTelemetries(proxy *Proxy) computedTelemetries {
namespace := proxy.ConfigNamespace
// Order here matters. The latter elements will override the first elements
ms := []*tpb.Metrics{}
ls := []*tpb.AccessLogging{}
ls := []*computedAccessLogging{}
ts := []*tpb.Tracing{}
key := telemetryKey{}
if t.RootNamespace != "" {
telemetry := t.namespaceWideTelemetryConfig(t.RootNamespace)
if telemetry != (Telemetry{}) {
key.Root = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace}
ms = append(ms, telemetry.Spec.GetMetrics()...)
ls = append(ls, telemetry.Spec.GetAccessLogging()...)
ls = append(ls, &computedAccessLogging{
telemetryKey: telemetryKey{
Root: key.Root,
},
Logging: telemetry.Spec.GetAccessLogging(),
})
ts = append(ts, telemetry.Spec.GetTracing()...)
}
}
Expand All @@ -384,7 +403,12 @@ func (t *Telemetries) applicableTelemetries(proxy *Proxy) computedTelemetries {
if telemetry != (Telemetry{}) {
key.Namespace = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace}
ms = append(ms, telemetry.Spec.GetMetrics()...)
ls = append(ls, telemetry.Spec.GetAccessLogging()...)
ls = append(ls, &computedAccessLogging{
telemetryKey: telemetryKey{
Namespace: key.Namespace,
},
Logging: telemetry.Spec.GetAccessLogging(),
})
ts = append(ts, telemetry.Spec.GetTracing()...)
}
}
Expand All @@ -398,7 +422,12 @@ func (t *Telemetries) applicableTelemetries(proxy *Proxy) computedTelemetries {
if selector.SubsetOf(proxy.Metadata.Labels) {
key.Workload = NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace}
ms = append(ms, spec.GetMetrics()...)
ls = append(ls, spec.GetAccessLogging()...)
ls = append(ls, &computedAccessLogging{
telemetryKey: telemetryKey{
Workload: NamespacedName{Name: telemetry.Name, Namespace: telemetry.Namespace},
},
Logging: telemetry.Spec.GetAccessLogging(),
})
ts = append(ts, spec.GetTracing()...)
break
}
Expand Down Expand Up @@ -439,11 +468,14 @@ func (t *Telemetries) telemetryFilters(proxy *Proxy, class networking.ListenerCl
tmm := mergeMetrics(c.Metrics, t.meshConfig)
log.Debugf("merged metrics, proxyID: %s metrics: %+v", proxy.ID, tmm)
// Additionally, fetch relevant access logging configurations
tml, logsFilter := mergeLogs(c.Logging, t.meshConfig, workloadMode(class))
tml := mergeLogs(c.Logging, t.meshConfig, workloadMode(class))

// The above result is in a nested map to deduplicate responses. This loses ordering, so we convert to
// a list to retain stable naming
allKeys := sets.New(tml.UnsortedList()...)
allKeys := sets.New()
for k := range tml {
allKeys.Insert(k)
}
for k := range tmm {
allKeys.Insert(k)
}
Expand All @@ -456,12 +488,13 @@ func (t *Telemetries) telemetryFilters(proxy *Proxy, class networking.ListenerCl
}
_, logging := tml[k]
_, metrics := tmm[k]

cfg := telemetryFilterConfig{
Provider: p,
metricsConfig: tmm[k],
AccessLogging: logging,
Metrics: metrics,
LogsFilter: logsFilter,
LogsFilter: tml[p.Name],
}
m = append(m, cfg)
}
Expand All @@ -481,58 +514,67 @@ func (t *Telemetries) telemetryFilters(proxy *Proxy, class networking.ListenerCl
}

// mergeLogs returns the set of providers for the given logging configuration.
func mergeLogs(logs []*tpb.AccessLogging, mesh *meshconfig.MeshConfig, mode tpb.WorkloadMode) (sets.Set, *tpb.AccessLogging_Filter) {
providers := sets.New()
// The provider names are mapped to any applicable access logging filter that has been applied in provider configuration.
func mergeLogs(logs []*computedAccessLogging, mesh *meshconfig.MeshConfig, mode tpb.WorkloadMode) map[string]*tpb.AccessLogging_Filter {
providers := map[string]*tpb.AccessLogging_Filter{}

if len(logs) == 0 {
for _, dp := range mesh.GetDefaultProviders().GetAccessLogging() {
// Insert the default provider.
providers.Insert(dp)
providers[dp] = nil
}
return providers, nil
return providers
}
var loggingFilter *tpb.AccessLogging_Filter
providerNames := mesh.GetDefaultProviders().GetAccessLogging()
filters := map[string]*tpb.AccessLogging_Filter{}
for _, m := range logs {
names := getProviderNames(m.Providers)
if len(names) > 0 {
providerNames = names
names := sets.New()
for _, p := range m.Logging {
subProviders := getProviderNames(p.Providers)
names.InsertAll(subProviders...)

for _, prov := range subProviders {
filters[prov] = p.Filter
}
}

if m.Filter != nil {
loggingFilter = m.Filter
if len(names) > 0 {
providerNames = names.UnsortedList()
}
}
inScopeProviders := sets.New(providerNames...)

parentProviders := mesh.GetDefaultProviders().GetAccessLogging()
for _, m := range logs {
providerNames := getProviderNames(m.Providers)
if len(providerNames) == 0 {
providerNames = parentProviders
}
parentProviders = providerNames
for _, provider := range providerNames {
if !inScopeProviders.Contains(provider) {
// We don't care about this, remove it
// This occurs when a top level provider is later disabled by a lower level
continue
for _, l := range logs {
for _, m := range l.Logging {
providerNames := getProviderNames(m.Providers)
if len(providerNames) == 0 {
providerNames = parentProviders
}
parentProviders = providerNames
for _, provider := range providerNames {
if !inScopeProviders.Contains(provider) {
// We don't care about this, remove it
// This occurs when a top level provider is later disabled by a lower level
continue
}

if !matchWorkloadMode(m.Match, mode) {
continue
}
if !matchWorkloadMode(m.Match, mode) {
continue
}

if m.GetDisabled().GetValue() {
providers.Delete(provider)
continue
}
// see UT: server - multi filters disabled
if m.GetDisabled().GetValue() {
delete(providers, provider)
continue
}

providers.Insert(provider)
providers[provider] = filters[provider]
}
}
}

return providers, loggingFilter
return providers
}

func matchWorkloadMode(selector *tpb.AccessLogging_LogSelector, mode tpb.WorkloadMode) bool {
Expand Down
Loading

0 comments on commit 9acc01d

Please sign in to comment.