Skip to content

Commit

Permalink
internal/xds: convert v3 protobufs automatically
Browse files Browse the repository at this point in the history
Force support for xDS v3 by mapping the v3 endpoints to the internal v2
resource caches. Since the xDS upgrade is wire-compatible, we can force
an initial upgrade my rewriting the type URLs of embedded messages and
forcing v3 in the dynamic resource configuration.

This updates #1898.

Signed-off-by: James Peach <jpeach@vmware.com>
  • Loading branch information
jpeach committed Oct 30, 2020
1 parent ad9adf9 commit 0bdf204
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 8 deletions.
9 changes: 8 additions & 1 deletion cmd/contour/serve.go
Expand Up @@ -654,8 +654,15 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {
case config.EnvoyServerType:
contour_xds_v2.RegisterServer(envoy_server_v2.NewServer(context.Background(), snapshotCache, nil), grpcServer)
case config.ContourServerType:
contour_xds_v2.RegisterServer(contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer)
contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer)

// Check an internal feature flag to disable xDS v2 endpoints. This is strictly for testing.
if config.GetenvOr("CONTOUR_INTERNAL_DISABLE_XDSV2", "N") != "N" {
contour_xds_v2.RegisterServer(contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...), grpcServer)
}
default:
// This can't happen due to config validation.
log.Fatalf("invalid xdsServerType %q configured", ctx.Config.Server.XDSServerType)
}

addr := net.JoinHostPort(ctx.xdsAddr, strconv.Itoa(ctx.xdsPort))
Expand Down
1 change: 1 addition & 0 deletions examples/contour/03-envoy.yaml
Expand Up @@ -99,6 +99,7 @@ spec:
- /config/envoy.json
- --xds-address=contour
- --xds-port=8001
- --xds-resource-version=v3
- --resources-dir=/config/resources
- --envoy-cafile=/certs/ca.crt
- --envoy-cert-file=/certs/tls.crt
Expand Down
1 change: 1 addition & 0 deletions examples/render/contour.yaml
Expand Up @@ -1849,6 +1849,7 @@ spec:
- /config/envoy.json
- --xds-address=contour
- --xds-port=8001
- --xds-resource-version=v3
- --resources-dir=/config/resources
- --envoy-cafile=/certs/ca.crt
- --envoy-cert-file=/certs/tls.crt
Expand Down
6 changes: 6 additions & 0 deletions internal/protobuf/helpers.go
Expand Up @@ -95,3 +95,9 @@ func MustMarshalAny(pb proto.Message) *any.Any {

return a
}

// AnyMessageTypeOf returns the any.Any type of msg.
func AnyMessageTypeOf(msg proto.Message) string {
a := MustMarshalAny(msg)
return a.TypeUrl
}
183 changes: 183 additions & 0 deletions internal/xds/util.go
Expand Up @@ -14,8 +14,33 @@
package xds

import (
"fmt"
"log"
"strings"

envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_api_auth_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
envoy_api_core_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
envoy_config_accesslog_v2 "github.com/envoyproxy/go-control-plane/envoy/config/accesslog/v2"
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
envoy_config_filter_http_ext_authz_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/http/ext_authz/v2"
envoy_config_filter_http_lua_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/http/lua/v2"
envoy_config_filter_network_http_connection_manager_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
envoy_config_filter_network_tcp_proxy_v2 "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2"
envoy_config_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_config_route_v3 "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
envoy_extensions_access_loggers_file_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/access_loggers/file/v3"
envoy_extensions_filters_http_ext_authz_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_authz/v3"
envoy_extensions_filters_http_lua_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/lua/v3"
envoy_extensions_filters_network_http_connection_manager_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
envoy_extensions_filters_network_tcp_proxy_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/tcp_proxy/v3"
envoy_extensions_transport_sockets_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/projectcontour/contour/internal/protobuf"

"github.com/golang/protobuf/proto"
"k8s.io/apimachinery/pkg/types"
)

Expand All @@ -38,3 +63,161 @@ func ClusterLoadAssignmentName(service types.NamespacedName, portName string) st

return strings.Join(name, "/")
}

// TypeMapping maps xDS type URLs from v2 to v3.
var TypeMapping map[string]string

func init() {
TypeMapping = make(map[string]string)

entry := func(from proto.Message, to proto.Message) {
TypeMapping[protobuf.AnyMessageTypeOf(from)] = protobuf.AnyMessageTypeOf(to)
}

// Fundamental xDS resource types.
entry(&envoy_api_v2.Listener{}, &envoy_config_listener_v3.Listener{})
entry(&envoy_api_v2.Cluster{}, &envoy_config_cluster_v3.Cluster{})
entry(&envoy_api_v2.RouteConfiguration{}, &envoy_config_route_v3.RouteConfiguration{})
entry(&envoy_api_v2.ClusterLoadAssignment{}, &envoy_config_endpoint_v3.ClusterLoadAssignment{})
entry(&envoy_api_auth_v2.Secret{}, &envoy_extensions_transport_sockets_tls_v3.Secret{})

// Other embedded resources used by Contour.
entry(&envoy_config_accesslog_v2.FileAccessLog{},
&envoy_extensions_access_loggers_file_v3.FileAccessLog{})

entry(&envoy_config_filter_http_ext_authz_v2.ExtAuthzPerRoute{},
&envoy_extensions_filters_http_ext_authz_v3.ExtAuthzPerRoute{})

entry(&envoy_config_filter_http_ext_authz_v2.ExtAuthz{},
&envoy_extensions_filters_http_ext_authz_v3.ExtAuthz{})

entry(&envoy_config_filter_http_lua_v2.Lua{},
&envoy_extensions_filters_http_lua_v3.Lua{})

entry(&envoy_api_auth_v2.UpstreamTlsContext{},
&envoy_extensions_transport_sockets_tls_v3.UpstreamTlsContext{})

entry(&envoy_api_auth_v2.DownstreamTlsContext{},
&envoy_extensions_transport_sockets_tls_v3.DownstreamTlsContext{})

entry(&envoy_config_filter_network_http_connection_manager_v2.HttpConnectionManager{},
&envoy_extensions_filters_network_http_connection_manager_v3.HttpConnectionManager{})

entry(&envoy_config_filter_network_tcp_proxy_v2.TcpProxy{},
&envoy_extensions_filters_network_tcp_proxy_v3.TcpProxy{})
}

func rewriteAnyMessage(a *any.Any) {
if a != nil {
anyval := ptypes.DynamicAny{}
if err := ptypes.UnmarshalAny(a, &anyval); err != nil {
panic(fmt.Sprintf("failed to unmarshal %T: %s", a, err.Error()))
}

newMsg := protobuf.MustMarshalAny(Rewrite(anyval.Message))
if replacement, ok := TypeMapping[newMsg.TypeUrl]; ok {
newMsg.TypeUrl = replacement
}

a.TypeUrl = newMsg.TypeUrl
a.Value = newMsg.Value
}
}

func rewriteConfigSource(s *envoy_api_core_v2.ConfigSource) {
if s != nil {
s.ResourceApiVersion = envoy_api_core_v2.ApiVersion_V3
s.GetApiConfigSource().TransportApiVersion = envoy_api_core_v2.ApiVersion_V3
}
}

// Rewrite changes the given xDS message to use the v3 xDS API.
//
// Since the v2 and v3 APIs are wire-compatible, we just rewrite
// the type names for type URLs in any.Any messages. This allows Envoy
// to do the actual conversion, and Envoy takes care of migrating
// deprecated fields.
func Rewrite(in proto.Message) proto.Message {
switch msg := in.(type) {
case *envoy_api_v2.ClusterLoadAssignment:
return msg
case *envoy_api_auth_v2.Secret:
return msg

case *envoy_api_v2.Cluster:
if e := msg.GetEdsClusterConfig(); e != nil {
rewriteConfigSource(e.GetEdsConfig())
}

if t := msg.GetTransportSocket(); t != nil {
rewriteAnyMessage(t.GetTypedConfig())
}

return msg

case *envoy_api_v2.RouteConfiguration:
for _, v := range msg.GetVirtualHosts() {
for _, r := range v.GetRoutes() {
for _, conf := range r.GetTypedPerFilterConfig() {
rewriteAnyMessage(conf)
}
}
}

return msg

case *envoy_api_v2.Listener:
for _, filter := range msg.ListenerFilters {
rewriteAnyMessage(filter.GetTypedConfig())
}

for _, chain := range msg.FilterChains {
for _, filter := range chain.Filters {
rewriteAnyMessage(filter.GetTypedConfig())
}

if t := chain.GetTransportSocket(); t != nil {
rewriteAnyMessage(t.GetTypedConfig())
}
}

for _, a := range msg.AccessLog {
rewriteAnyMessage(a.GetTypedConfig())
}

return msg

case *envoy_config_filter_network_http_connection_manager_v2.HttpConnectionManager:
if r := msg.GetRds(); r != nil {
rewriteConfigSource(r.GetConfigSource())
}

for _, f := range msg.HttpFilters {
rewriteAnyMessage(f.GetTypedConfig())
}

for _, l := range msg.AccessLog {
rewriteAnyMessage(l.GetTypedConfig())
}

return msg

case *envoy_api_auth_v2.DownstreamTlsContext:
for _, s := range msg.GetCommonTlsContext().TlsCertificateSdsSecretConfigs {
rewriteConfigSource(s.GetSdsConfig())
}

return msg

case *envoy_api_auth_v2.UpstreamTlsContext:
for _, s := range msg.GetCommonTlsContext().TlsCertificateSdsSecretConfigs {
rewriteConfigSource(s.GetSdsConfig())
}

return msg

default:
log.Printf("missing conversion for %T, good luck", msg)
return msg
}
}
2 changes: 1 addition & 1 deletion internal/xds/v2/contour.go
Expand Up @@ -118,7 +118,7 @@ func (s *contourServer) stream(st grpcStream) error {
}

log = log.WithField("resource_names", req.ResourceNames).WithField("type_url", req.TypeUrl)
log.Info("handling xDS resource request")
log.Info("handling v2 xDS resource request")

// now we wait for a notification, if this is the first request received on this
// connection last will be less than zero and that will trigger a response immediately.
Expand Down
25 changes: 19 additions & 6 deletions internal/xds/v3/contour.go
Expand Up @@ -48,6 +48,11 @@ func NewContourServer(log logrus.FieldLogger, resources ...xds.Resource) Server

for i, r := range resources {
c.resources[r.TypeURL()] = resources[i]

// Map the xDS v3 resource to this provider.
if v3, ok := xds.TypeMapping[r.TypeURL()]; ok {
c.resources[v3] = resources[i]
}
}

return &c
Expand Down Expand Up @@ -114,15 +119,17 @@ func (s *contourServer) stream(st grpcStream) error {
log.WithField("code", status.Code).Error(status.Message)
}

// from the request we derive the resource to stream which have
// From the request we derive the resource to stream which have
// been registered according to the typeURL.
r, ok := s.resources[req.TypeUrl]
r, ok := s.resources[req.GetTypeUrl()]
if !ok {
return done(log, fmt.Errorf("no resource registered for typeURL %q", req.TypeUrl))
if !ok {
return done(log, fmt.Errorf("no resource registered for typeURL %q", req.GetTypeUrl()))
}
}

log = log.WithField("resource_names", req.ResourceNames).WithField("type_url", req.TypeUrl)
log.Info("handling xDS resource request")
log = log.WithField("resource_names", req.ResourceNames).WithField("type_url", req.GetTypeUrl())
log.Info("handling v3 xDS resource request")

// now we wait for a notification, if this is the first request received on this
// connection last will be less than zero and that will trigger a response immediately.
Expand All @@ -144,20 +151,26 @@ func (s *contourServer) stream(st grpcStream) error {
resources = r.Query(req.ResourceNames)
}

// Rewrite the embedded message types to v3.
for _, r := range resources {
xds.Rewrite(r)
}

any := make([]*any.Any, 0, len(resources))
for _, r := range resources {
a, err := ptypes.MarshalAny(r)
if err != nil {
return done(log, err)
}

a.TypeUrl = req.GetTypeUrl()
any = append(any, a)
}

resp := &envoy_service_discovery_v3.DiscoveryResponse{
VersionInfo: strconv.Itoa(last),
Resources: any,
TypeUrl: r.TypeURL(),
TypeUrl: req.GetTypeUrl(),
Nonce: strconv.Itoa(last),
}

Expand Down

0 comments on commit 0bdf204

Please sign in to comment.