diff --git a/Makefile.core.mk b/Makefile.core.mk index 7973a5491f1..dc1b1357167 100644 --- a/Makefile.core.mk +++ b/Makefile.core.mk @@ -290,17 +290,19 @@ lint: lint-python lint-copyright-banner lint-scripts lint-go lint-dockerfiles li # (proto) Envoy TLS proto for SDS # (proto) Envoy Wasm filters for wasm xDS proxy # (proto) xDS discovery service for xDS proxy +# (proto) SDS secret and contrib QAT and cryptomb .PHONY: check-agent-deps check-agent-deps: @go list -f '{{ join .Deps "\n" }}' -tags=agent \ - ./pilot/cmd/pilot-agent/app \ + ./pilot/cmd/pilot-agent/... \ ./pkg/istio-agent/... | sort | uniq |\ grep -Pv '^k8s.io/(utils|klog|apimachinery)/' |\ grep -Pv 'envoy/type/|envoy/annotations|envoy/config/core/' |\ grep -Pv 'envoy/extensions/transport_sockets/tls/' |\ - grep -Pv 'envoy/service/discovery/v3' |\ + grep -Pv 'envoy/service/(discovery|secret)/v3' |\ grep -Pv 'envoy/extensions/wasm/' |\ grep -Pv 'envoy/extensions/filters/(http|network)/wasm/' |\ + grep -Pv 'contrib/envoy/extensions/private_key_providers/' |\ grep -Pv 'istio\.io/api/(annotation|label|mcp|mesh|networking|security/v1alpha1|type)' |\ (! grep -P '^k8s.io|^sigs.k8s.io/gateway-api|cel|antlr|jwx/jwk|envoy/|istio.io/api') diff --git a/pilot/pkg/model/context.go b/pilot/pkg/model/context.go index b644e9b5825..7d6b8c58b56 100644 --- a/pilot/pkg/model/context.go +++ b/pilot/pkg/model/context.go @@ -272,14 +272,6 @@ func AnyToUnnamedResources(r []*anypb.Any) Resources { return a } -func ResourcesToAny(r Resources) []*anypb.Any { - a := make([]*anypb.Any, 0, len(r)) - for _, rr := range r { - a = append(a, rr.Resource) - } - return a -} - // XdsUpdates include information about the subset of updated resources. // See for example EDS incremental updates. type XdsUpdates = sets.Set[ConfigKey] diff --git a/pilot/pkg/xds/delta_test.go b/pilot/pkg/xds/delta_test.go index 3bb132eb34d..871187c8181 100644 --- a/pilot/pkg/xds/delta_test.go +++ b/pilot/pkg/xds/delta_test.go @@ -33,6 +33,7 @@ import ( "istio.io/istio/pkg/test/util/assert" "istio.io/istio/pkg/util/sets" "istio.io/istio/pkg/workloadapi" + xdsserver "istio.io/istio/pkg/xds" ) func TestDeltaAds(t *testing.T) { @@ -53,7 +54,7 @@ func TestDeltaAdsClusterUpdate(t *testing.T) { ResourceNamesUnsubscribe: remove, }) nonce = res.Nonce - got := xdstest.MapKeys(xdstest.ExtractLoadAssignments(xdstest.UnmarshalClusterLoadAssignment(t, model.ResourcesToAny(res.Resources)))) + got := xdstest.MapKeys(xdstest.ExtractLoadAssignments(xdstest.UnmarshalClusterLoadAssignment(t, xdsserver.ResourcesToAny(res.Resources)))) if !reflect.DeepEqual(expect, got) { t.Fatalf("expected clusters %v got %v", expect, got) } diff --git a/pilot/pkg/xds/sds_test.go b/pilot/pkg/xds/sds_test.go index 9550dd66828..8d29a213fe9 100644 --- a/pilot/pkg/xds/sds_test.go +++ b/pilot/pkg/xds/sds_test.go @@ -41,6 +41,7 @@ import ( "istio.io/istio/pkg/spiffe" "istio.io/istio/pkg/test/env" "istio.io/istio/pkg/util/sets" + xdsserver "istio.io/istio/pkg/xds" ) func makeSecret(name string, data map[string]string) *corev1.Secret { @@ -329,7 +330,7 @@ func TestGenerateSDS(t *testing.T) { gen := s.Discovery.Generators[v3.SecretType] tt.request.Start = time.Now() secrets, _, _ := gen.Generate(s.SetupProxy(tt.proxy), &model.WatchedResource{ResourceNames: tt.resources}, tt.request) - raw := xdstest.ExtractTLSSecrets(t, model.ResourcesToAny(secrets)) + raw := xdstest.ExtractTLSSecrets(t, xdsserver.ResourcesToAny(secrets)) got := map[string]Expected{} for _, scrt := range raw { @@ -375,14 +376,14 @@ func TestCaching(t *testing.T) { } secrets, _, _ := gen.Generate(s.SetupProxy(istiosystem), &model.WatchedResource{ResourceNames: []string{"kubernetes://generic"}}, fullPush) - raw := xdstest.ExtractTLSSecrets(t, model.ResourcesToAny(secrets)) + raw := xdstest.ExtractTLSSecrets(t, xdsserver.ResourcesToAny(secrets)) if len(raw) != 1 { t.Fatalf("failed to get expected secrets for authorized proxy: %v", raw) } // We should not get secret returned, even though we are asking for the same one secrets, _, _ = gen.Generate(s.SetupProxy(otherNamespace), &model.WatchedResource{ResourceNames: []string{"kubernetes://generic"}}, fullPush) - raw = xdstest.ExtractTLSSecrets(t, model.ResourcesToAny(secrets)) + raw = xdstest.ExtractTLSSecrets(t, xdsserver.ResourcesToAny(secrets)) if len(raw) != 0 { t.Fatalf("failed to get expected secrets for unauthorized proxy: %v", raw) } @@ -423,7 +424,7 @@ func TestPrivateKeyProviderProxyConfig(t *testing.T) { gen := s.Discovery.Generators[v3.SecretType] fullPush := &model.PushRequest{Full: true, Start: time.Now()} secrets, _, _ := gen.Generate(s.SetupProxy(rawProxy), &model.WatchedResource{ResourceNames: []string{"kubernetes://generic"}}, fullPush) - raw := xdstest.ExtractTLSSecrets(t, model.ResourcesToAny(secrets)) + raw := xdstest.ExtractTLSSecrets(t, xdsserver.ResourcesToAny(secrets)) for _, scrt := range raw { if scrt.GetTlsCertificate().GetPrivateKeyProvider() != nil { t.Fatalf("expect no private key provider in secret") @@ -432,7 +433,7 @@ func TestPrivateKeyProviderProxyConfig(t *testing.T) { // add private key provider in proxy-config secrets, _, _ = gen.Generate(s.SetupProxy(pkpProxy), &model.WatchedResource{ResourceNames: []string{"kubernetes://generic"}}, fullPush) - raw = xdstest.ExtractTLSSecrets(t, model.ResourcesToAny(secrets)) + raw = xdstest.ExtractTLSSecrets(t, xdsserver.ResourcesToAny(secrets)) for _, scrt := range raw { privateKeyProvider := scrt.GetTlsCertificate().GetPrivateKeyProvider() if privateKeyProvider == nil { @@ -445,7 +446,7 @@ func TestPrivateKeyProviderProxyConfig(t *testing.T) { // erase private key provider in proxy-config secrets, _, _ = gen.Generate(s.SetupProxy(rawProxy), &model.WatchedResource{ResourceNames: []string{"kubernetes://generic"}}, fullPush) - raw = xdstest.ExtractTLSSecrets(t, model.ResourcesToAny(secrets)) + raw = xdstest.ExtractTLSSecrets(t, xdsserver.ResourcesToAny(secrets)) for _, scrt := range raw { if scrt.GetTlsCertificate().GetPrivateKeyProvider() != nil { t.Fatalf("expect no private key provider in secret") diff --git a/pilot/pkg/xds/xdsgen.go b/pilot/pkg/xds/xdsgen.go index a4ae7b7c51b..7b5a4cb6e2a 100644 --- a/pilot/pkg/xds/xdsgen.go +++ b/pilot/pkg/xds/xdsgen.go @@ -144,7 +144,7 @@ func (s *DiscoveryServer) pushXds(con *Connection, w *model.WatchedResource, req // TODO: send different version for incremental eds VersionInfo: req.Push.PushVersion, Nonce: nonce(req.Push.LedgerVersion), - Resources: model.ResourcesToAny(res), + Resources: xds.ResourcesToAny(res), } configSize := ResourceSize(res) diff --git a/pkg/xds/server.go b/pkg/xds/server.go index 8d39f4c7563..3615c507a45 100644 --- a/pkg/xds/server.go +++ b/pkg/xds/server.go @@ -22,6 +22,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/anypb" "istio.io/istio/pilot/pkg/features" istiogrpc "istio.io/istio/pilot/pkg/grpc" @@ -45,6 +46,14 @@ func (rd ResourceDelta) IsEmpty() bool { type Resources = []*discovery.Resource +func ResourcesToAny(r Resources) []*anypb.Any { + a := make([]*anypb.Any, 0, len(r)) + for _, rr := range r { + a = append(a, rr.Resource) + } + return a +} + // WatchedResource tracks an active DiscoveryRequest subscription. type WatchedResource struct { // TypeUrl is copied from the DiscoveryRequest.TypeUrl that initiated watching this resource. diff --git a/security/pkg/nodeagent/sds/sdsservice.go b/security/pkg/nodeagent/sds/sdsservice.go index cdb2269a0f0..660969d3ffe 100644 --- a/security/pkg/nodeagent/sds/sdsservice.go +++ b/security/pkg/nodeagent/sds/sdsservice.go @@ -18,6 +18,9 @@ package sds import ( "context" "fmt" + "strconv" + "sync" + "sync/atomic" "time" cryptomb "github.com/envoyproxy/go-control-plane/contrib/envoy/extensions/private_key_providers/cryptomb/v3alpha" @@ -26,26 +29,20 @@ import ( tls "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" sds "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3" - "golang.org/x/time/rate" + "github.com/google/uuid" + uberatomic "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" mesh "istio.io/api/mesh/v1alpha1" - "istio.io/istio/pilot/pkg/config/memory" - "istio.io/istio/pilot/pkg/model" - "istio.io/istio/pilot/pkg/serviceregistry/aggregate" "istio.io/istio/pilot/pkg/util/protoconv" - "istio.io/istio/pilot/pkg/xds" - v3 "istio.io/istio/pilot/pkg/xds/v3" "istio.io/istio/pkg/backoff" - meshcfg "istio.io/istio/pkg/config/mesh" - "istio.io/istio/pkg/config/schema/collection" - "istio.io/istio/pkg/config/schema/kind" "istio.io/istio/pkg/log" + "istio.io/istio/pkg/model" "istio.io/istio/pkg/security" - "istio.io/istio/pkg/util/sets" + "istio.io/istio/pkg/xds" ) var sdsServiceLog = log.RegisterScope("sds", "SDS service debugging") @@ -53,67 +50,23 @@ var sdsServiceLog = log.RegisterScope("sds", "SDS service debugging") type sdsservice struct { st security.SecretManager - XdsServer *xds.DiscoveryServer stop chan struct{} rootCaPath string pkpConf *mesh.PrivateKeyProvider -} - -// Assert we implement the generator interface -var _ model.XdsResourceGenerator = &sdsservice{} - -// NewXdsServer builds a minimal DiscoveryServer for istio-agent SDS only. -// The DiscoveryServer was originally designed for Istiod, so a lot of this is plugging in fake empty information. -func NewXdsServer(stop chan struct{}, gen model.XdsResourceGenerator) *xds.DiscoveryServer { - // Setup a mostly empty NewEnvironment - env := model.NewEnvironment() - env.ServiceDiscovery = aggregate.NewController(aggregate.Options{}) - env.ConfigStore = memory.Make(collection.NewSchemasBuilder().Build()) - env.Watcher = meshcfg.NewFixedWatcher(meshcfg.DefaultMeshConfig()) - env.PushContext().Mesh = env.Watcher.Mesh() - env.Init() - - s := xds.NewDiscoveryServer(env, map[string]string{}) - - // No ratelimit for SDS calls in agent. - s.RequestRateLimit = rate.NewLimiter(0, 1) - s.Generators = map[string]model.XdsResourceGenerator{ - v3.SecretType: gen, - } - s.ProxyNeedsPush = func(proxy *model.Proxy, req *model.PushRequest) bool { - // Empty changes means "all" - if len(req.ConfigsUpdated) == 0 { - sdsServiceLog.Debugf("Proxy %s needs push all") - return true - } - var resources []string - proxy.RLock() - if proxy.WatchedResources[v3.SecretType] != nil { - resources = proxy.WatchedResources[v3.SecretType].ResourceNames - } - proxy.RUnlock() - - if resources == nil { - sdsServiceLog.Debugf("Skipping push for proxy %s, no resources", proxy.ID) - return false - } - names := sets.New(resources...) - found := false - for name := range model.ConfigsOfKind(req.ConfigsUpdated, kind.Secret) { - if names.Contains(name.Name) { - found = true - break - } - } + sync.Mutex + clients map[string]*Context +} - sdsServiceLog.Debugf("Proxy %s needs push %v, names: %v request: %v", proxy.ID, found, names, req) +type Context struct { + BaseConnection xds.Connection + s *sdsservice + w *Watch +} - return found - } - s.CachesSynced() - s.Start(stop) - return s +type Watch struct { + sync.Mutex + watch *xds.WatchedResource } // newSDSService creates Secret Discovery Service which implements envoy SDS API. @@ -122,8 +75,8 @@ func newSDSService(st security.SecretManager, options *security.Options, pkpConf st: st, stop: make(chan struct{}), pkpConf: pkpConf, + clients: make(map[string]*Context), } - ret.XdsServer = NewXdsServer(ret.stop, ret) ret.rootCaPath = options.CARootPath @@ -168,8 +121,10 @@ func newSDSService(st security.SecretManager, options *security.Options, pkpConf return ret } -func (s *sdsservice) generate(resourceNames []string) (model.Resources, error) { - resources := model.Resources{} +var version uberatomic.Uint64 + +func (s *sdsservice) generate(resourceNames []string) (*discovery.DiscoveryResponse, error) { + resources := xds.Resources{} for _, resourceName := range resourceNames { secret, err := s.st.GenerateSecret(resourceName) if err != nil { @@ -188,38 +143,138 @@ func (s *sdsservice) generate(resourceNames []string) (model.Resources, error) { Resource: res, }) } - return resources, nil + return &discovery.DiscoveryResponse{ + TypeUrl: model.SecretType, + VersionInfo: time.Now().Format(time.RFC3339) + "/" + strconv.FormatUint(version.Inc(), 10), + Nonce: uuid.New().String(), + Resources: xds.ResourcesToAny(resources), + }, nil } -// Generate implements the XDS Generator interface. This allows the XDS server to dispatch requests -// for SecretTypeV3 to our server to generate the Envoy response. -func (s *sdsservice) Generate(proxy *model.Proxy, w *model.WatchedResource, updates *model.PushRequest) (model.Resources, model.XdsLogDetails, error) { - // updates.Full indicates we should do a complete push of all updated resources - // In practice, all pushes should be incremental (ie, if the `default` cert changes we won't push - // all file certs). - if updates.Full { - resp, err := s.generate(w.ResourceNames) - return resp, pushLog(w.ResourceNames), err +// register adds the SDS handle to the grpc server +func (s *sdsservice) register(rpcs *grpc.Server) { + sds.RegisterSecretDiscoveryServiceServer(rpcs, s) +} + +func (s *sdsservice) push(secretName string) { + s.Lock() + defer s.Unlock() + for _, client := range s.clients { + go func(client *Context) { + select { + case client.XdsConnection().PushCh() <- secretName: + case <-client.XdsConnection().StreamDone(): + } + }(client) } - names := []string{} - watched := sets.New(w.ResourceNames...) - for i := range updates.ConfigsUpdated { - if i.Kind == kind.Secret && watched.Contains(i.Name) { - names = append(names, i.Name) +} + +func (c Context) XdsConnection() *xds.Connection { + return &c.BaseConnection +} + +var connectionNumber = int64(0) + +func (c *Context) Initialize(_ *core.Node) error { + id := atomic.AddInt64(&connectionNumber, 1) + con := c.XdsConnection() + con.SetID(strconv.FormatInt(id, 10)) + + c.s.Lock() + c.s.clients[con.ID()] = c + c.s.Unlock() + + con.MarkInitialized() + return nil +} + +func (c *Context) Close() { + c.s.Lock() + defer c.s.Unlock() + delete(c.s.clients, c.XdsConnection().ID()) +} + +func (c *Context) Watcher() xds.Watcher { + return c.w +} + +func (w *Watch) DeleteWatchedResource(string) { + w.Lock() + defer w.Unlock() + w.watch = nil +} + +func (w *Watch) GetWatchedResource(string) *xds.WatchedResource { + w.Lock() + defer w.Unlock() + return w.watch +} + +func (w *Watch) NewWatchedResource(typeURL string, names []string) { + w.Lock() + defer w.Unlock() + w.watch = &xds.WatchedResource{TypeUrl: typeURL, ResourceNames: names} +} + +func (w *Watch) UpdateWatchedResource(_ string, f func(*xds.WatchedResource) *xds.WatchedResource) { + w.Lock() + defer w.Unlock() + w.watch = f(w.watch) +} + +func (w *Watch) GetID() string { + // This always maps to the same local Envoy instance. + return "" +} + +func (w *Watch) requested(secretName string) bool { + w.Lock() + defer w.Unlock() + if w.watch != nil { + for _, res := range w.watch.ResourceNames { + if res == secretName { + return true + } } } - resp, err := s.generate(names) - return resp, pushLog(names), err + return false } -// register adds the SDS handle to the grpc server -func (s *sdsservice) register(rpcs *grpc.Server) { - sds.RegisterSecretDiscoveryServiceServer(rpcs, s) +func (c *Context) Process(req *discovery.DiscoveryRequest) error { + shouldRespond, delta := xds.ShouldRespond(c.Watcher(), c.XdsConnection().ID(), req) + if !shouldRespond { + return nil + } + resources := req.ResourceNames + if !delta.IsEmpty() { + resources = delta.Subscribed.UnsortedList() + } + res, err := c.s.generate(resources) + if err != nil { + return err + } + return xds.Send(c, res) +} + +func (c *Context) Push(ev any) error { + secretName := ev.(string) + if !c.w.requested(secretName) { + return nil + } + res, err := c.s.generate([]string{secretName}) + if err != nil { + return err + } + return xds.Send(c, res) } // StreamSecrets serves SDS discovery requests and SDS push requests func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecretsServer) error { - return s.XdsServer.Stream(stream) + return xds.Stream(&Context{ + BaseConnection: xds.NewConnection("", stream), + s: s, + w: &Watch{}, + }) } func (s *sdsservice) DeltaSecrets(stream sds.SecretDiscoveryService_DeltaSecretsServer) error { @@ -232,7 +287,6 @@ func (s *sdsservice) FetchSecrets(ctx context.Context, discReq *discovery.Discov func (s *sdsservice) Close() { close(s.stop) - s.XdsServer.Shutdown() } // toEnvoySecret converts a security.SecretItem to an Envoy tls.Secret @@ -330,11 +384,3 @@ func toEnvoySecret(s *security.SecretItem, caRootPath string, pkpConf *mesh.Priv } return secret } - -func pushLog(names []string) model.XdsLogDetails { - if len(names) == 1 { - // For common case of single resource, show which resource it was - return model.XdsLogDetails{AdditionalInfo: "resource:" + names[0]} - } - return model.DefaultXdsLogDetails -} diff --git a/security/pkg/nodeagent/sds/server.go b/security/pkg/nodeagent/sds/server.go index fa6498a4637..a0bc712c3da 100644 --- a/security/pkg/nodeagent/sds/server.go +++ b/security/pkg/nodeagent/sds/server.go @@ -22,11 +22,8 @@ import ( "google.golang.org/grpc" mesh "istio.io/api/mesh/v1alpha1" - "istio.io/istio/pilot/pkg/model" - "istio.io/istio/pkg/config/schema/kind" "istio.io/istio/pkg/security" "istio.io/istio/pkg/uds" - "istio.io/istio/pkg/util/sets" ) const ( @@ -58,11 +55,7 @@ func (s *Server) OnSecretUpdate(resourceName string) { } sdsServiceLog.Debugf("Trigger on secret update, resource name: %s", resourceName) - s.workloadSds.XdsServer.Push(&model.PushRequest{ - Full: false, - ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.Secret, Name: resourceName}), - Reason: model.NewReasonStats(model.SecretTrigger), - }) + s.workloadSds.push(resourceName) } // Stop closes the gRPC server and debug server. diff --git a/tests/binary/binaries_test.go b/tests/binary/binaries_test.go index eea411f7348..1b2b0d06a5c 100644 --- a/tests/binary/binaries_test.go +++ b/tests/binary/binaries_test.go @@ -107,7 +107,7 @@ func TestBinarySizes(t *testing.T) { // TODO: shrink the ranges here once the active work to reduce binary size is complete // For now, having two small a range will result in lots of "merge conflicts" "istioctl": {60, 85}, - "pilot-agent": {30, 37}, + "pilot-agent": {20, 28}, // TODO(https://github.com/kubernetes/kubernetes/issues/101384) bump this down a bit? "pilot-discovery": {60, 85}, "bug-report": {60, 80}, diff --git a/tests/integration/pilot/piggyback_test.go b/tests/integration/pilot/piggyback_test.go index d7edafd833b..6a4f5a19e77 100644 --- a/tests/integration/pilot/piggyback_test.go +++ b/tests/integration/pilot/piggyback_test.go @@ -22,14 +22,10 @@ import ( "strings" "testing" - discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - - "istio.io/istio/pilot/pkg/xds" "istio.io/istio/pkg/test/framework" "istio.io/istio/pkg/test/framework/components/echo" "istio.io/istio/pkg/test/framework/components/istioctl" "istio.io/istio/pkg/test/util/retry" - "istio.io/istio/pkg/util/protomarshal" ) func TestPiggyback(t *testing.T) { @@ -44,31 +40,6 @@ func TestPiggyback(t *testing.T) { for _, workload := range workloads { podName := workload[0].WorkloadsOrFail(t)[0].PodName() namespace := workload.Config().Namespace.Name() - // Add retry loop to handle case when the pod has disconnected from Istio temporarily - retry.UntilSuccessOrFail(t, func() error { - out, _, err := t.Clusters()[0].PodExec( - podName, - workload.Config().Namespace.Name(), - "istio-proxy", - "pilot-agent request --debug-port 15004 GET /debug/syncz") - if err != nil { - return fmt.Errorf("couldn't curl sidecar: %v", err) - } - dr := discovery.DiscoveryResponse{} - if err := protomarshal.Unmarshal([]byte(out), &dr); err != nil { - return fmt.Errorf("unmarshal: %v", err) - } - if dr.TypeUrl != xds.TypeDebugSyncronization { - return fmt.Errorf("the output doesn't contain expected typeURL: %s", out) - } - if len(dr.Resources) < 1 { - return fmt.Errorf("the output didn't unmarshal as expected (no resources): %s", out) - } - if dr.Resources[0].TypeUrl != "type.googleapis.com/envoy.service.status.v3.ClientConfig" { - return fmt.Errorf("resources[0] doesn't contain expected typeURL: %s", out) - } - return nil - }) retry.UntilSuccessOrFail(t, func() error { args := []string{