Skip to content

Commit

Permalink
internal/xds: Refactor xDS Server into a v2 package
Browse files Browse the repository at this point in the history
Refactors the xDS Server into a v2 package allowing for
a v3 server to be created seperate from the v2 instance.

Updates #1898

Signed-off-by: Steve Sloka <slokas@vmware.com>
  • Loading branch information
stevesloka committed Oct 9, 2020
1 parent 1c77ba9 commit b873b45
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 221 deletions.
7 changes: 4 additions & 3 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/projectcontour/contour/internal/timeout"
"github.com/projectcontour/contour/internal/workgroup"
"github.com/projectcontour/contour/internal/xds"
contour_xds_v2 "github.com/projectcontour/contour/internal/xds/v2"
"github.com/projectcontour/contour/internal/xdscache"
xdscache_v2 "github.com/projectcontour/contour/internal/xdscache/v2"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -623,12 +624,12 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

switch ctx.XDSServerType {
case "contour":
grpcServer = xds.RegisterServer(
xds.NewContourServer(log, xdscache.ResourcesOf(resources)...),
grpcServer = contour_xds_v2.RegisterServer(
contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...),
registry,
ctx.grpcOptions(log)...)
case "envoy":
grpcServer = xds.RegisterServer(
grpcServer = contour_xds_v2.RegisterServer(
server.NewServer(context.Background(), snapshotCache, nil),
registry,
ctx.grpcOptions(log)...)
Expand Down
6 changes: 3 additions & 3 deletions internal/featuretests/v2/featuretests.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/projectcontour/contour/internal/sorter"
"github.com/projectcontour/contour/internal/status"
"github.com/projectcontour/contour/internal/workgroup"
"github.com/projectcontour/contour/internal/xds"
contour_xds_v2 "github.com/projectcontour/contour/internal/xds/v2"
"github.com/projectcontour/contour/internal/xdscache"
xdscache_v2 "github.com/projectcontour/contour/internal/xdscache/v2"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -130,8 +130,8 @@ func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Cont
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

srv := xds.RegisterServer(
xds.NewContourServer(log, xdscache.ResourcesOf(resources)...),
srv := contour_xds_v2.RegisterServer(
contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...),
r /* Prometheus registry */)

var g workgroup.Group
Expand Down
182 changes: 1 addition & 181 deletions internal/xds/contour.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,7 @@

package xds

import (
"context"
"fmt"
"strconv"
"sync/atomic"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/sirupsen/logrus"
)
import "github.com/golang/protobuf/proto"

// Resource represents a source of proto.Messages that can be registered
// for interest.
Expand All @@ -42,171 +30,3 @@ type Resource interface {
// TypeURL returns the typeURL of messages returned from Values.
TypeURL() string
}

type grpcStream interface {
Context() context.Context
Send(*v2.DiscoveryResponse) error
Recv() (*v2.DiscoveryRequest, error)
}

// counter holds an atomically incrementing counter.
type counter uint64

func (c *counter) next() uint64 {
return atomic.AddUint64((*uint64)(c), 1)
}

var connections counter

// NewContourServer creates an internally implemented Server that streams the
// provided set of Resource objects. The returned Server implements the xDS
// State of the World (SotW) variant.
func NewContourServer(log logrus.FieldLogger, resources ...Resource) Server {
c := contourServer{
FieldLogger: log,
resources: map[string]Resource{},
}

for i, r := range resources {
c.resources[r.TypeURL()] = resources[i]
}

return &c
}

type contourServer struct {
// Since we only implement the streaming state of the world
// protocol, embed the default null implementations to handle
// the unimplemented gRPC endpoints.
discovery.UnimplementedAggregatedDiscoveryServiceServer
discovery.UnimplementedSecretDiscoveryServiceServer
v2.UnimplementedRouteDiscoveryServiceServer
v2.UnimplementedEndpointDiscoveryServiceServer
v2.UnimplementedClusterDiscoveryServiceServer
v2.UnimplementedListenerDiscoveryServiceServer

logrus.FieldLogger
resources map[string]Resource
}

// stream processes a stream of DiscoveryRequests.
func (s *contourServer) stream(st grpcStream) error {
// Bump connection counter and set it as a field on the logger.
log := s.WithField("connection", connections.next())

// Notify whether the stream terminated on error.
done := func(log *logrus.Entry, err error) error {
if err != nil {
log.WithError(err).Error("stream terminated")
} else {
log.Info("stream terminated")
}

return err
}

ch := make(chan int, 1)

// internally all registration values start at zero so sending
// a last that is less than zero will guarantee that each stream
// will generate a response immediately, then wait.
last := -1
ctx := st.Context()

// now stick in this loop until the client disconnects.
for {
// first we wait for the request from Envoy, this is part of
// the xDS protocol.
req, err := st.Recv()
if err != nil {
return done(log, err)
}

// note: redeclare log in this scope so the next time around the loop all is forgotten.
log := log.WithField("version_info", req.VersionInfo).WithField("response_nonce", req.ResponseNonce)
if req.Node != nil {
log = log.WithField("node_id", req.Node.Id).WithField("node_version", fmt.Sprintf("v%d.%d.%d", req.Node.GetUserAgentBuildVersion().Version.MajorNumber, req.Node.GetUserAgentBuildVersion().Version.MinorNumber, req.Node.GetUserAgentBuildVersion().Version.Patch))
}

if status := req.ErrorDetail; status != nil {
// if Envoy rejected the last update log the details here.
// TODO(dfc) issue 1176: handle xDS ACK/NACK
log.WithField("code", status.Code).Error(status.Message)
}

// from the request we derive the resource to stream which have
// been registered according to the typeURL.
r, ok := s.resources[req.TypeUrl]
if !ok {
return done(log, fmt.Errorf("no resource registered for typeURL %q", req.TypeUrl))
}

log = log.WithField("resource_names", req.ResourceNames).WithField("type_url", req.TypeUrl)
log.Info("stream_wait")

// now we wait for a notification, if this is the first request received on this
// connection last will be less than zero and that will trigger a response immediately.
r.Register(ch, last, req.ResourceNames...)
select {
case last = <-ch:
// boom, something in the cache has changed.
// TODO(dfc) the thing that has changed may not be in the scope of the filter
// so we're going to be sending an update that is a no-op. See #426

var resources []proto.Message
switch len(req.ResourceNames) {
case 0:
// no resource hints supplied, return the full
// contents of the resource
resources = r.Contents()
default:
// resource hints supplied, return exactly those
resources = r.Query(req.ResourceNames)
}

any := make([]*any.Any, 0, len(resources))
for _, r := range resources {
a, err := ptypes.MarshalAny(r)
if err != nil {
return done(log, err)
}

any = append(any, a)
}

resp := &v2.DiscoveryResponse{
VersionInfo: strconv.Itoa(last),
Resources: any,
TypeUrl: r.TypeURL(),
Nonce: strconv.Itoa(last),
}

if err := st.Send(resp); err != nil {
return done(log, err)
}

case <-ctx.Done():
return done(log, ctx.Err())
}
}
}

func (s *contourServer) StreamClusters(srv v2.ClusterDiscoveryService_StreamClustersServer) error {
return s.stream(srv)
}

func (s *contourServer) StreamEndpoints(srv v2.EndpointDiscoveryService_StreamEndpointsServer) error {
return s.stream(srv)
}

func (s *contourServer) StreamListeners(srv v2.ListenerDiscoveryService_StreamListenersServer) error {
return s.stream(srv)
}

func (s *contourServer) StreamRoutes(srv v2.RouteDiscoveryService_StreamRoutesServer) error {
return s.stream(srv)
}

func (s *contourServer) StreamSecrets(srv discovery.SecretDiscoveryService_StreamSecretsServer) error {
return s.stream(srv)
}

0 comments on commit b873b45

Please sign in to comment.