From 0bdf2043a0b1935eba71df0e82ed7faf86cd4bcd Mon Sep 17 00:00:00 2001 From: James Peach Date: Wed, 28 Oct 2020 15:57:42 +1100 Subject: [PATCH] internal/xds: convert v3 protobufs automatically Force support for xDS v3 by mapping the v3 endpoints to the internal v2 resource caches. Since the xDS upgrade is wire-compatible, we can force an initial upgrade my rewriting the type URLs of embedded messages and forcing v3 in the dynamic resource configuration. This updates #1898. Signed-off-by: James Peach --- cmd/contour/serve.go | 9 +- examples/contour/03-envoy.yaml | 1 + examples/render/contour.yaml | 1 + internal/protobuf/helpers.go | 6 ++ internal/xds/util.go | 183 +++++++++++++++++++++++++++++++++ internal/xds/v2/contour.go | 2 +- internal/xds/v3/contour.go | 25 +++-- 7 files changed, 219 insertions(+), 8 deletions(-) diff --git a/cmd/contour/serve.go b/cmd/contour/serve.go index a01967ca68e..af64f285bad 100644 --- a/cmd/contour/serve.go +++ b/cmd/contour/serve.go @@ -654,8 +654,15 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error { case config.EnvoyServerType: contour_xds_v2.RegisterServer(envoy_server_v2.NewServer(context.Background(), snapshotCache, nil), grpcServer) case config.ContourServerType: - contour_xds_v2.RegisterServer(contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer) contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer) + + // Check an internal feature flag to disable xDS v2 endpoints. This is strictly for testing. + if config.GetenvOr("CONTOUR_INTERNAL_DISABLE_XDSV2", "N") != "N" { + contour_xds_v2.RegisterServer(contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer) + } + default: + // This can't happen due to config validation. + log.Fatalf("invalid xdsServerType %q configured", ctx.Config.Server.XDSServerType) } addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort)) diff --git a/examples/contour/03-envoy.yaml b/examples/contour/03-envoy.yaml index bfa1b8d306f..67e0fd04044 100644 --- a/examples/contour/03-envoy.yaml +++ b/examples/contour/03-envoy.yaml @@ -99,6 +99,7 @@ spec: - /config/envoy.json - --xds-address=contour - --xds-port=8001 + - --xds-resource-version=v3 - --resources-dir=/config/resources - --envoy-cafile=/certs/ca.crt - --envoy-cert-file=/certs/tls.crt diff --git a/examples/render/contour.yaml b/examples/render/contour.yaml index a42a690b858..f247190e46e 100644 --- a/examples/render/contour.yaml +++ b/examples/render/contour.yaml @@ -1849,6 +1849,7 @@ spec: - /config/envoy.json - --xds-address=contour - --xds-port=8001 + - --xds-resource-version=v3 - --resources-dir=/config/resources - --envoy-cafile=/certs/ca.crt - --envoy-cert-file=/certs/tls.crt diff --git a/internal/protobuf/helpers.go b/internal/protobuf/helpers.go index 038f12ec68a..bdcb9c0ae37 100644 --- a/internal/protobuf/helpers.go +++ b/internal/protobuf/helpers.go @@ -95,3 +95,9 @@ func MustMarshalAny(pb proto.Message) *any.Any { return a } + +// AnyMessageTypeOf returns the any.Any type of msg. +func AnyMessageTypeOf(msg proto.Message) string { + a := MustMarshalAny(msg) + return a.TypeUrl +} diff --git a/internal/xds/util.go b/internal/xds/util.go index 64dcae3db6d..8448a27e0cb 100644 --- a/internal/xds/util.go +++ b/internal/xds/util.go @@ -14,8 +14,33 @@ package xds import ( + "fmt" + "log" "strings" + envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + envoy_api_auth_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" + envoy_api_core_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + envoy_config_accesslog_v2 "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v2" + envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + envoy_config_filter_http_ext_authz_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/http/ext_authz/v2" + envoy_config_filter_http_lua_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/http/lua/v2" + envoy_config_filter_network_http_connection_manager_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" + envoy_config_filter_network_tcp_proxy_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2" + envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + envoy_extensions_access_loggers_file_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3" + envoy_extensions_filters_http_ext_authz_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_authz/v3" + envoy_extensions_filters_http_lua_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3" + envoy_extensions_filters_network_http_connection_manager_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" + envoy_extensions_filters_network_tcp_proxy_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3" + envoy_extensions_transport_sockets_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/any" + "github.com/projectcontour/contour/internal/protobuf" + + "github.com/golang/protobuf/proto" "k8s.io/apimachinery/pkg/types" ) @@ -38,3 +63,161 @@ func ClusterLoadAssignmentName(service types.NamespacedName, portName string) st return strings.Join(name, "/") } + +// TypeMapping maps xDS type URLs from v2 to v3. +var TypeMapping map[string]string + +func init() { + TypeMapping = make(map[string]string) + + entry := func(from proto.Message, to proto.Message) { + TypeMapping[protobuf.AnyMessageTypeOf(from)] = protobuf.AnyMessageTypeOf(to) + } + + // Fundamental xDS resource types. + entry(&envoy_api_v2.Listener{}, &envoy_config_listener_v3.Listener{}) + entry(&envoy_api_v2.Cluster{}, &envoy_config_cluster_v3.Cluster{}) + entry(&envoy_api_v2.RouteConfiguration{}, &envoy_config_route_v3.RouteConfiguration{}) + entry(&envoy_api_v2.ClusterLoadAssignment{}, &envoy_config_endpoint_v3.ClusterLoadAssignment{}) + entry(&envoy_api_auth_v2.Secret{}, &envoy_extensions_transport_sockets_tls_v3.Secret{}) + + // Other embedded resources used by Contour. + entry(&envoy_config_accesslog_v2.FileAccessLog{}, + &envoy_extensions_access_loggers_file_v3.FileAccessLog{}) + + entry(&envoy_config_filter_http_ext_authz_v2.ExtAuthzPerRoute{}, + &envoy_extensions_filters_http_ext_authz_v3.ExtAuthzPerRoute{}) + + entry(&envoy_config_filter_http_ext_authz_v2.ExtAuthz{}, + &envoy_extensions_filters_http_ext_authz_v3.ExtAuthz{}) + + entry(&envoy_config_filter_http_lua_v2.Lua{}, + &envoy_extensions_filters_http_lua_v3.Lua{}) + + entry(&envoy_api_auth_v2.UpstreamTlsContext{}, + &envoy_extensions_transport_sockets_tls_v3.UpstreamTlsContext{}) + + entry(&envoy_api_auth_v2.DownstreamTlsContext{}, + &envoy_extensions_transport_sockets_tls_v3.DownstreamTlsContext{}) + + entry(&envoy_config_filter_network_http_connection_manager_v2.HttpConnectionManager{}, + &envoy_extensions_filters_network_http_connection_manager_v3.HttpConnectionManager{}) + + entry(&envoy_config_filter_network_tcp_proxy_v2.TcpProxy{}, + &envoy_extensions_filters_network_tcp_proxy_v3.TcpProxy{}) +} + +func rewriteAnyMessage(a *any.Any) { + if a != nil { + anyval := ptypes.DynamicAny{} + if err := ptypes.UnmarshalAny(a, &anyval); err != nil { + panic(fmt.Sprintf("failed to unmarshal %T: %s", a, err.Error())) + } + + newMsg := protobuf.MustMarshalAny(Rewrite(anyval.Message)) + if replacement, ok := TypeMapping[newMsg.TypeUrl]; ok { + newMsg.TypeUrl = replacement + } + + a.TypeUrl = newMsg.TypeUrl + a.Value = newMsg.Value + } +} + +func rewriteConfigSource(s *envoy_api_core_v2.ConfigSource) { + if s != nil { + s.ResourceApiVersion = envoy_api_core_v2.ApiVersion_V3 + s.GetApiConfigSource().TransportApiVersion = envoy_api_core_v2.ApiVersion_V3 + } +} + +// Rewrite changes the given xDS message to use the v3 xDS API. +// +// Since the v2 and v3 APIs are wire-compatible, we just rewrite +// the type names for type URLs in any.Any messages. This allows Envoy +// to do the actual conversion, and Envoy takes care of migrating +// deprecated fields. +func Rewrite(in proto.Message) proto.Message { + switch msg := in.(type) { + case *envoy_api_v2.ClusterLoadAssignment: + return msg + case *envoy_api_auth_v2.Secret: + return msg + + case *envoy_api_v2.Cluster: + if e := msg.GetEdsClusterConfig(); e != nil { + rewriteConfigSource(e.GetEdsConfig()) + } + + if t := msg.GetTransportSocket(); t != nil { + rewriteAnyMessage(t.GetTypedConfig()) + } + + return msg + + case *envoy_api_v2.RouteConfiguration: + for _, v := range msg.GetVirtualHosts() { + for _, r := range v.GetRoutes() { + for _, conf := range r.GetTypedPerFilterConfig() { + rewriteAnyMessage(conf) + } + } + } + + return msg + + case *envoy_api_v2.Listener: + for _, filter := range msg.ListenerFilters { + rewriteAnyMessage(filter.GetTypedConfig()) + } + + for _, chain := range msg.FilterChains { + for _, filter := range chain.Filters { + rewriteAnyMessage(filter.GetTypedConfig()) + } + + if t := chain.GetTransportSocket(); t != nil { + rewriteAnyMessage(t.GetTypedConfig()) + } + } + + for _, a := range msg.AccessLog { + rewriteAnyMessage(a.GetTypedConfig()) + } + + return msg + + case *envoy_config_filter_network_http_connection_manager_v2.HttpConnectionManager: + if r := msg.GetRds(); r != nil { + rewriteConfigSource(r.GetConfigSource()) + } + + for _, f := range msg.HttpFilters { + rewriteAnyMessage(f.GetTypedConfig()) + } + + for _, l := range msg.AccessLog { + rewriteAnyMessage(l.GetTypedConfig()) + } + + return msg + + case *envoy_api_auth_v2.DownstreamTlsContext: + for _, s := range msg.GetCommonTlsContext().TlsCertificateSdsSecretConfigs { + rewriteConfigSource(s.GetSdsConfig()) + } + + return msg + + case *envoy_api_auth_v2.UpstreamTlsContext: + for _, s := range msg.GetCommonTlsContext().TlsCertificateSdsSecretConfigs { + rewriteConfigSource(s.GetSdsConfig()) + } + + return msg + + default: + log.Printf("missing conversion for %T, good luck", msg) + return msg + } +} diff --git a/internal/xds/v2/contour.go b/internal/xds/v2/contour.go index 135500368fc..5bdf7024c3e 100644 --- a/internal/xds/v2/contour.go +++ b/internal/xds/v2/contour.go @@ -118,7 +118,7 @@ func (s *contourServer) stream(st grpcStream) error { } log = log.WithField("resource_names", req.ResourceNames).WithField("type_url", req.TypeUrl) - log.Info("handling xDS resource request") + log.Info("handling v2 xDS resource request") // now we wait for a notification, if this is the first request received on this // connection last will be less than zero and that will trigger a response immediately. diff --git a/internal/xds/v3/contour.go b/internal/xds/v3/contour.go index 13766ef296e..e7f0ed4781b 100644 --- a/internal/xds/v3/contour.go +++ b/internal/xds/v3/contour.go @@ -48,6 +48,11 @@ func NewContourServer(log logrus.FieldLogger, resources ...xds.Resource) Server for i, r := range resources { c.resources[r.TypeURL()] = resources[i] + + // Map the xDS v3 resource to this provider. + if v3, ok := xds.TypeMapping[r.TypeURL()]; ok { + c.resources[v3] = resources[i] + } } return &c @@ -114,15 +119,17 @@ func (s *contourServer) stream(st grpcStream) error { log.WithField("code", status.Code).Error(status.Message) } - // from the request we derive the resource to stream which have + // From the request we derive the resource to stream which have // been registered according to the typeURL. - r, ok := s.resources[req.TypeUrl] + r, ok := s.resources[req.GetTypeUrl()] if !ok { - return done(log, fmt.Errorf("no resource registered for typeURL %q", req.TypeUrl)) + if !ok { + return done(log, fmt.Errorf("no resource registered for typeURL %q", req.GetTypeUrl())) + } } - log = log.WithField("resource_names", req.ResourceNames).WithField("type_url", req.TypeUrl) - log.Info("handling xDS resource request") + log = log.WithField("resource_names", req.ResourceNames).WithField("type_url", req.GetTypeUrl()) + log.Info("handling v3 xDS resource request") // now we wait for a notification, if this is the first request received on this // connection last will be less than zero and that will trigger a response immediately. @@ -144,6 +151,11 @@ func (s *contourServer) stream(st grpcStream) error { resources = r.Query(req.ResourceNames) } + // Rewrite the embedded message types to v3. + for _, r := range resources { + xds.Rewrite(r) + } + any := make([]*any.Any, 0, len(resources)) for _, r := range resources { a, err := ptypes.MarshalAny(r) @@ -151,13 +163,14 @@ func (s *contourServer) stream(st grpcStream) error { return done(log, err) } + a.TypeUrl = req.GetTypeUrl() any = append(any, a) } resp := &envoy_service_discovery_v3.DiscoveryResponse{ VersionInfo: strconv.Itoa(last), Resources: any, - TypeUrl: r.TypeURL(), + TypeUrl: req.GetTypeUrl(), Nonce: strconv.Itoa(last), }