diff --git a/go.sum b/go.sum index 75edbe3a..1b3e83e0 100644 --- a/go.sum +++ b/go.sum @@ -80,7 +80,6 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334 h1:VHgatEHNcBFEB7inlalqfNqw65aNkM1lGX2yt3NmbS8= github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= @@ -88,12 +87,9 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lyft/protoc-gen-star v0.4.14 h1:HUkD4H4dYFIgu3Bns/3N6J5GmKHCEGnhYBwNu3fvXgA= github.com/lyft/protoc-gen-star v0.4.14/go.mod h1:mE8fbna26u7aEA2QCVvvfBU/ZrPgocG1206xAFPcs94= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -109,7 +105,6 @@ github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg= github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -131,7 +126,6 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= -github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.6 h1:breEStsVwemnKh2/s6gMvSdMEkwW0sK8vGStnlVBMCs= @@ -157,7 +151,6 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= -go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.14.0 h1:/pduUoebOeeJzTDFuoMgC6nRkiasr1sBCIEorly7m4o= @@ -169,7 +162,6 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -207,9 +199,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -221,7 +211,6 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk= google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= @@ -233,7 +222,6 @@ google.golang.org/protobuf v1.20.1 h1:ESRXHgpUBG5D2I5mmsQIyYxB/tQIZfSZ8wLyFDf/N/ google.golang.org/protobuf v1.20.1/go.mod h1:KqelGeouBkcbcuB3HCk4/YH2tmNLk6YSWA5LIWeI/lY= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= @@ -248,5 +236,4 @@ gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/internal/app/cache/cache.go b/internal/app/cache/cache.go index ea0ae37c..aee1647f 100644 --- a/internal/app/cache/cache.go +++ b/internal/app/cache/cache.go @@ -154,7 +154,6 @@ func (c *cache) AddRequest(key string, req *v2.DiscoveryRequest) error { return fmt.Errorf("unable to cast cache value to type resource for key: %s", key) } resource.Requests = append(resource.Requests, req) - resource.ExpirationTime = c.getExpirationTime(time.Now()) c.cache.Add(key, resource) return nil } diff --git a/internal/app/orchestrator/orchestrator.go b/internal/app/orchestrator/orchestrator.go index 02b57ab5..a2612b6b 100644 --- a/internal/app/orchestrator/orchestrator.go +++ b/internal/app/orchestrator/orchestrator.go @@ -165,7 +165,7 @@ func (o *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func()) // streams. o.upstreamResponseMap.mu.Lock() if _, ok := o.upstreamResponseMap.responseChannel[aggregatedKey]; !ok { - upstreamResponseChan := o.upstreamClient.OpenStream(ctx, &req) + upstreamResponseChan, _, _ := o.upstreamClient.OpenStream(req) respChannel := o.upstreamResponseMap.add(aggregatedKey, upstreamResponseChan) // Spin up a go routine to watch for upstream responses. // One routine is opened per aggregate key. @@ -187,24 +187,24 @@ func (o *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (*gcp. func (o *orchestrator) watchUpstream( ctx context.Context, aggregatedKey string, - responseChannel <-chan *upstream.Response, + responseChannel <-chan *discovery.DiscoveryResponse, done <-chan bool, ) { for { select { - case x := <-responseChannel: - if x.Err != nil { + case x, more := <-responseChannel: + if !more { // A problem occurred fetching the response upstream, log and // return the most recent cached response, so that the // downstream will reissue the discovery request. - o.logger.With("err", x.Err).With("key", aggregatedKey).Error(ctx, "upstream error") + o.logger.With("key", aggregatedKey).Error(ctx, "upstream error") } else { // Cache the response. - _, err := o.cache.SetResponse(aggregatedKey, x.Response) + _, err := o.cache.SetResponse(aggregatedKey, *x) if err != nil { // If we fail to cache the new response, log and return the old one. o.logger.With("err", err).With("key", aggregatedKey). - With("resp", x.Response).Error(ctx, "Failed to cache the response") + Error(ctx, "Failed to cache the response") } } @@ -220,7 +220,7 @@ func (o *orchestrator) watchUpstream( } else { if cached == nil || cached.Resp == nil { // If cache is empty, there is nothing to fan out. - if x.Err != nil { + if !more { // Warn. Benefit of the doubt that this is the first request. o.logger.With("key", aggregatedKey). Warn(ctx, "attempted to fan out with no cached response") diff --git a/internal/app/orchestrator/orchestrator_test.go b/internal/app/orchestrator/orchestrator_test.go index ae69e470..59f99860 100644 --- a/internal/app/orchestrator/orchestrator_test.go +++ b/internal/app/orchestrator/orchestrator_test.go @@ -16,6 +16,7 @@ import ( "github.com/envoyproxy/xds-relay/internal/app/cache" "github.com/envoyproxy/xds-relay/internal/app/mapper" "github.com/envoyproxy/xds-relay/internal/app/upstream" + upstream_mock "github.com/envoyproxy/xds-relay/internal/app/upstream/mock" "github.com/envoyproxy/xds-relay/internal/pkg/log" yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util" aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1" @@ -23,34 +24,33 @@ import ( ) type mockSimpleUpstreamClient struct { - responseChan <-chan *upstream.Response + responseChan <-chan *v2.DiscoveryResponse } -func (m mockSimpleUpstreamClient) OpenStream(ctx context.Context, req *v2.DiscoveryRequest) <-chan *upstream.Response { - return m.responseChan +func (m mockSimpleUpstreamClient) OpenStream(req v2.DiscoveryRequest) (<-chan *v2.DiscoveryResponse, func(), error) { + return m.responseChan, func() {}, nil } type mockMultiStreamUpstreamClient struct { - ldsResponseChan <-chan *upstream.Response - cdsResponseChan <-chan *upstream.Response + ldsResponseChan <-chan *v2.DiscoveryResponse + cdsResponseChan <-chan *v2.DiscoveryResponse t *testing.T mapper mapper.Mapper } -func (m mockMultiStreamUpstreamClient) OpenStream(ctx context.Context, - req *v2.DiscoveryRequest) <-chan *upstream.Response { - aggregatedKey, err := m.mapper.GetKey(*req) +func (m mockMultiStreamUpstreamClient) OpenStream(req v2.DiscoveryRequest) (<-chan *v2.DiscoveryResponse, func(), error) { + aggregatedKey, err := m.mapper.GetKey(req) assert.NoError(m.t, err) if aggregatedKey == "lds" { - return m.ldsResponseChan + return m.ldsResponseChan, func() {}, nil } else if aggregatedKey == "cds" { - return m.cdsResponseChan + return m.cdsResponseChan, func() {}, nil } m.t.Errorf("Unsupported aggregated key, %s", aggregatedKey) - return nil + return nil, func() {}, nil } func newMockOrchestrator(t *testing.T, mapper mapper.Mapper, upstreamClient upstream.Client) *orchestrator { @@ -96,8 +96,12 @@ func assertEqualResources(t *testing.T, got gcp.Response, expected v2.DiscoveryR func TestNew(t *testing.T) { // Trivial test to ensure orchestrator instantiates. - upstreamClient, err := upstream.NewClient(context.Background(), "example.com") - assert.NoError(t, err) + upstreamClient := upstream_mock.NewClient( + context.Background(), + upstream.CallOptions{}, + nil, + nil, + func(m interface{}) error { return nil }) config := aggregationv1.KeyerConfiguration{ Fragments: []*aggregationv1.KeyerConfiguration_Fragment{ @@ -120,7 +124,7 @@ func TestNew(t *testing.T) { } func TestGoldenPath(t *testing.T) { - upstreamResponseChannel := make(chan *upstream.Response) + upstreamResponseChannel := make(chan *v2.DiscoveryResponse) mapper := newMockMapper(t) orchestrator := newMockOrchestrator( t, @@ -140,21 +144,19 @@ func TestGoldenPath(t *testing.T) { assert.Equal(t, 1, len(orchestrator.downstreamResponseMap.responseChannel)) assert.Equal(t, 1, len(orchestrator.upstreamResponseMap.responseChannel)) - upstreamResponse := upstream.Response{ - Response: v2.DiscoveryResponse{ - VersionInfo: "1", - TypeUrl: "type.googleapis.com/envoy.api.v2.Listener", - Resources: []*any.Any{ - &anypb.Any{ - Value: []byte("lds resource"), - }, + resp := v2.DiscoveryResponse{ + VersionInfo: "1", + TypeUrl: "type.googleapis.com/envoy.api.v2.Listener", + Resources: []*any.Any{ + &anypb.Any{ + Value: []byte("lds resource"), }, }, } - upstreamResponseChannel <- &upstreamResponse + upstreamResponseChannel <- &resp gotResponse := <-respChannel - assertEqualResources(t, gotResponse, upstreamResponse.Response, req) + assertEqualResources(t, gotResponse, resp, req) aggregatedKey, err := mapper.GetKey(req) assert.NoError(t, err) @@ -166,7 +168,7 @@ func TestGoldenPath(t *testing.T) { } func TestCachedResponse(t *testing.T) { - upstreamResponseChannel := make(chan *upstream.Response) + upstreamResponseChannel := make(chan *v2.DiscoveryResponse) mapper := newMockMapper(t) orchestrator := newMockOrchestrator( t, @@ -208,20 +210,19 @@ func TestCachedResponse(t *testing.T) { assertEqualResources(t, gotResponse, mockResponse, req) // Attempt pushing a more recent response from upstream. - upstreamResponse := upstream.Response{ - Response: v2.DiscoveryResponse{ - VersionInfo: "2", - TypeUrl: "type.googleapis.com/envoy.api.v2.Listener", - Resources: []*any.Any{ - &anypb.Any{ - Value: []byte("some other lds resource"), - }, + resp := v2.DiscoveryResponse{ + VersionInfo: "2", + TypeUrl: "type.googleapis.com/envoy.api.v2.Listener", + Resources: []*any.Any{ + &anypb.Any{ + Value: []byte("some other lds resource"), }, }, } - upstreamResponseChannel <- &upstreamResponse + + upstreamResponseChannel <- &resp gotResponse = <-respChannel - assertEqualResources(t, gotResponse, upstreamResponse.Response, req) + assertEqualResources(t, gotResponse, resp, req) assert.Equal(t, 1, len(orchestrator.upstreamResponseMap.responseChannel)) // Test scenario with same request and response version. @@ -247,8 +248,8 @@ func TestCachedResponse(t *testing.T) { } func TestMultipleWatchesAndUpstreams(t *testing.T) { - upstreamResponseChannelLDS := make(chan *upstream.Response) - upstreamResponseChannelCDS := make(chan *upstream.Response) + upstreamResponseChannelLDS := make(chan *v2.DiscoveryResponse) + upstreamResponseChannelCDS := make(chan *v2.DiscoveryResponse) mapper := newMockMapper(t) orchestrator := newMockOrchestrator( t, @@ -279,25 +280,21 @@ func TestMultipleWatchesAndUpstreams(t *testing.T) { respChannel3, cancelWatch3 := orchestrator.CreateWatch(req3) assert.NotNil(t, respChannel3) - upstreamResponseLDS := upstream.Response{ - Response: v2.DiscoveryResponse{ - VersionInfo: "1", - TypeUrl: "type.googleapis.com/envoy.api.v2.Listener", - Resources: []*any.Any{ - &anypb.Any{ - Value: []byte("lds resource"), - }, + upstreamResponseLDS := v2.DiscoveryResponse{ + VersionInfo: "1", + TypeUrl: "type.googleapis.com/envoy.api.v2.Listener", + Resources: []*any.Any{ + &anypb.Any{ + Value: []byte("lds resource"), }, }, } - upstreamResponseCDS := upstream.Response{ - Response: v2.DiscoveryResponse{ - VersionInfo: "1", - TypeUrl: "type.googleapis.com/envoy.api.v2.Cluster", - Resources: []*any.Any{ - &anypb.Any{ - Value: []byte("cds resource"), - }, + upstreamResponseCDS := v2.DiscoveryResponse{ + VersionInfo: "1", + TypeUrl: "type.googleapis.com/envoy.api.v2.Cluster", + Resources: []*any.Any{ + &anypb.Any{ + Value: []byte("cds resource"), }, }, } @@ -317,9 +314,9 @@ func TestMultipleWatchesAndUpstreams(t *testing.T) { assert.Equal(t, 3, len(orchestrator.downstreamResponseMap.responseChannel)) assert.Equal(t, 2, len(orchestrator.upstreamResponseMap.responseChannel)) - assertEqualResources(t, gotResponseFromChannel1, upstreamResponseLDS.Response, req1) - assertEqualResources(t, gotResponseFromChannel2, upstreamResponseLDS.Response, req2) - assertEqualResources(t, gotResponseFromChannel3, upstreamResponseCDS.Response, req3) + assertEqualResources(t, gotResponseFromChannel1, upstreamResponseLDS, req1) + assertEqualResources(t, gotResponseFromChannel2, upstreamResponseLDS, req2) + assertEqualResources(t, gotResponseFromChannel3, upstreamResponseCDS, req3) orchestrator.shutdown(aggregatedKeyLDS) orchestrator.shutdown(aggregatedKeyCDS) diff --git a/internal/app/orchestrator/upstream.go b/internal/app/orchestrator/upstream.go index eabd9a27..eb4590ef 100644 --- a/internal/app/orchestrator/upstream.go +++ b/internal/app/orchestrator/upstream.go @@ -3,7 +3,7 @@ package orchestrator import ( "sync" - "github.com/envoyproxy/xds-relay/internal/app/upstream" + discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2" ) // upstream is the map of aggregate key to the receive-only upstream @@ -14,7 +14,7 @@ type upstreamResponseMap struct { } type upstreamResponseChannel struct { - response <-chan *upstream.Response + response <-chan *discovery.DiscoveryResponse done chan bool } @@ -24,7 +24,7 @@ type upstreamResponseChannel struct { // Expects orchestrator to manage the lock since this is called simultaneously // with stream open. func (u *upstreamResponseMap) add(aggregatedKey string, - responseChannel <-chan *upstream.Response) upstreamResponseChannel { + responseChannel <-chan *discovery.DiscoveryResponse) upstreamResponseChannel { channel := upstreamResponseChannel{ response: responseChannel, done: make(chan bool, 1), diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 2d345413..6b94c405 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -4,6 +4,7 @@ import ( "context" "net" "strconv" + "time" "github.com/envoyproxy/xds-relay/internal/app/mapper" "github.com/envoyproxy/xds-relay/internal/app/orchestrator" @@ -38,7 +39,14 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, // Initialize upstream client. upstreamPort := strconv.FormatUint(uint64(bootstrapConfig.OriginServer.Address.PortValue), 10) upstreamAddress := net.JoinHostPort(bootstrapConfig.OriginServer.Address.Address, upstreamPort) - upstreamClient, err := upstream.NewClient(ctx, upstreamAddress) + // TODO: configure timeout param from bootstrap config. + // https://github.com/envoyproxy/xds-relay/issues/55 + upstreamClient, err := upstream.NewClient( + ctx, + upstreamAddress, + upstream.CallOptions{Timeout: time.Minute}, + logger.Named("xdsclient"), + ) if err != nil { logger.With("error", err).Panic(ctx, "failed to initialize upstream client") } diff --git a/internal/app/upstream/client.go b/internal/app/upstream/client.go index a15e9fec..2a9a1d6c 100644 --- a/internal/app/upstream/client.go +++ b/internal/app/upstream/client.go @@ -2,10 +2,30 @@ package upstream import ( "context" + "fmt" + "time" v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/envoyproxy/xds-relay/internal/pkg/log" + "google.golang.org/grpc" ) +const ( + // ListenerTypeURL is the resource url for listener + ListenerTypeURL = "type.googleapis.com/envoy.api.v2.Listener" + // ClusterTypeURL is the resource url for cluster + ClusterTypeURL = "type.googleapis.com/envoy.api.v2.Cluster" + // EndpointTypeURL is the resource url for endpoints + EndpointTypeURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" + // RouteTypeURL is the resource url for route + RouteTypeURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration" +) + +// UnsupportedResourceError is a custom error for unsupported typeURL +type UnsupportedResourceError struct { + TypeURL string +} + // Client handles the requests and responses from the origin server. // The xds client handles each xds request on a separate stream, // e.g. 2 different cds requests happen on 2 separate streams. @@ -22,31 +42,33 @@ type Client interface { // All responses from the origin server are sent back through the callback function. // // OpenStream uses the retry and timeout configurations to make best effort to get the responses from origin server. - // If the timeouts are exhausted, receive fails or a irrecoverable error occurs, the error is sent back to the caller. + // If the timeouts are exhausted, receive fails or a irrecoverable error occurs, the response channel is closed. // It is the caller's responsibility to send a new request from the last known DiscoveryRequest. - // Cancellation of the context cleans up all outstanding streams and releases all resources. - OpenStream(context.Context, *v2.DiscoveryRequest) <-chan *Response + // The shutdown function should be invoked to signal stream closure. + // The shutdown function represents the intent that a stream is supposed to be closed. + // All goroutines that depend on the ctx object should consider ctx.Done to be related to shutdown. + // All such scenarios need to exit cleanly and are not considered an erroneous situation. + OpenStream(v2.DiscoveryRequest) (<-chan *v2.DiscoveryResponse, func(), error) } type client struct { - //nolint - ldsClient v2.ListenerDiscoveryServiceClient - //nolint - rdsClient v2.RouteDiscoveryServiceClient - //nolint - edsClient v2.EndpointDiscoveryServiceClient - //nolint - cdsClient v2.ClusterDiscoveryServiceClient + ldsClient v2.ListenerDiscoveryServiceClient + rdsClient v2.RouteDiscoveryServiceClient + edsClient v2.EndpointDiscoveryServiceClient + cdsClient v2.ClusterDiscoveryServiceClient + callOptions CallOptions + logger log.Logger } -// Response struct is a holder for the result from a single request. -// A request can result in a response from origin server or an error -// Only one of the fields is valid at any time. If the error is set, the response will be ignored. -type Response struct { - //nolint - Response v2.DiscoveryResponse - //nolint - Err error +// CallOptions contains grpc client call options +type CallOptions struct { + // Timeout is the time to wait on a blocking grpc SendMsg. + Timeout time.Duration +} + +type version struct { + version string + nonce string } // NewClient creates a grpc connection with an upstream origin server. @@ -55,11 +77,177 @@ type Response struct { // // The method does not block until the underlying connection is up. // Returns immediately and connecting the server happens in background -// TODO: pass retry/timeout configurations -func NewClient(ctx context.Context, url string) (Client, error) { - return &client{}, nil +func NewClient(ctx context.Context, url string, callOptions CallOptions, logger log.Logger) (Client, error) { + // TODO: configure grpc options.https://github.com/envoyproxy/xds-relay/issues/55 + conn, err := grpc.Dial(url, grpc.WithInsecure()) + if err != nil { + return nil, err + } + + ldsClient := v2.NewListenerDiscoveryServiceClient(conn) + rdsClient := v2.NewRouteDiscoveryServiceClient(conn) + edsClient := v2.NewEndpointDiscoveryServiceClient(conn) + cdsClient := v2.NewClusterDiscoveryServiceClient(conn) + + go shutDown(ctx, conn) + + return &client{ + ldsClient: ldsClient, + rdsClient: rdsClient, + edsClient: edsClient, + cdsClient: cdsClient, + callOptions: callOptions, + logger: logger, + }, nil +} + +func (m *client) OpenStream(request v2.DiscoveryRequest) (<-chan *v2.DiscoveryResponse, func(), error) { + ctx, cancel := context.WithCancel(context.Background()) + var stream grpc.ClientStream + var err error + switch request.GetTypeUrl() { + case ListenerTypeURL: + stream, err = m.ldsClient.StreamListeners(ctx) + case ClusterTypeURL: + stream, err = m.cdsClient.StreamClusters(ctx) + case RouteTypeURL: + stream, err = m.rdsClient.StreamRoutes(ctx) + case EndpointTypeURL: + stream, err = m.edsClient.StreamEndpoints(ctx) + default: + defer cancel() + m.logger.Error(ctx, "Unsupported Type Url %s", request.GetTypeUrl()) + return nil, nil, &UnsupportedResourceError{TypeURL: request.GetTypeUrl()} + } + + if err != nil { + defer cancel() + return nil, nil, err + } + + signal := make(chan *version, 1) + // The xds protocol https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack + // specifies that the first request be empty nonce and empty version. + // The origin server will respond with the latest version. + signal <- &version{nonce: "", version: ""} + + response := make(chan *v2.DiscoveryResponse) + + go send(ctx, m.logger, cancel, &request, stream, signal, m.callOptions) + go recv(ctx, cancel, m.logger, response, stream, signal) + + // We use context cancellation over using a separate channel for signalling stream shutdown. + // The reason is cancelling a context tied with the stream is straightforward to signal closure. + // Also, the shutdown function could potentially be called more than once by a caller. + // Closing channels is not idempotent while cancelling context is idempotent. + return response, func() { cancel() }, nil +} + +// It is safe to assume send goroutine will not leak as long as these conditions are true: +// - SendMsg is performed with timeout. +// - send is a receiver for signal and exits when signal is closed by the owner. +// - send also exits on context cancellations. +func send( + ctx context.Context, + logger log.Logger, + cancelFunc context.CancelFunc, + request *v2.DiscoveryRequest, + stream grpc.ClientStream, + signal chan *version, + callOptions CallOptions) { + for { + select { + case sig, ok := <-signal: + if !ok { + return + } + request.ResponseNonce = sig.nonce + request.VersionInfo = sig.version + err := doWithTimeout(ctx, func() error { + return stream.SendMsg(request) + }, callOptions.Timeout) + if err != nil { + handleError(ctx, logger, "Error in SendMsg", cancelFunc, err) + return + } + case <-ctx.Done(): + _ = stream.CloseSend() + return + } + } +} + +// recv is an infinite loop which blocks on RecvMsg. +// The only ways to exit the goroutine is by cancelling the context or when an error occurs. +func recv( + ctx context.Context, + cancelFunc context.CancelFunc, + logger log.Logger, + response chan *v2.DiscoveryResponse, + stream grpc.ClientStream, + signal chan *version) { + for { + resp := new(v2.DiscoveryResponse) + if err := stream.RecvMsg(resp); err != nil { + handleError(ctx, logger, "Error in RecvMsg", cancelFunc, err) + break + } + select { + case <-ctx.Done(): + break + default: + response <- resp + signal <- &version{version: resp.GetVersionInfo(), nonce: resp.GetNonce()} + } + } + closeChannels(signal, response) +} + +func handleError(ctx context.Context, logger log.Logger, errMsg string, cancelFunc context.CancelFunc, err error) { + defer cancelFunc() + select { + case <-ctx.Done(): + // Context was cancelled, hence this is not an erroneous scenario. + // Context is cancelled only when shutdown is called or any of the send/recv goroutines error out. + // The shutdown can be called by the caller in many cases, during app shutdown/ttl expiry, etc + default: + logger.Error(ctx, "%s: %s", errMsg, err.Error()) + } +} + +// closeChannels is called whenever the context is cancelled (ctx.Done) in Send and Recv goroutines. +// It is also called when an irrecoverable error occurs and the error is passed to the caller. +func closeChannels(versionChan chan *version, responseChan chan *v2.DiscoveryResponse) { + close(versionChan) + close(responseChan) +} + +// shutDown should be called in a separate goroutine. +// This is a blocking function that closes the upstream connection on context completion. +func shutDown(ctx context.Context, conn *grpc.ClientConn) { + <-ctx.Done() + conn.Close() +} + +// DoWithTimeout runs f and returns its error. If the deadline d elapses first, +// it returns a DeadlineExceeded error instead. +// Ref: https://github.com/grpc/grpc-go/issues/1229#issuecomment-302755717 +func doWithTimeout(ctx context.Context, f func() error, d time.Duration) error { + timeoutCtx, cancel := context.WithTimeout(ctx, d) + defer cancel() + errChan := make(chan error, 1) + go func() { + errChan <- f() + close(errChan) + }() + select { + case <-timeoutCtx.Done(): + return timeoutCtx.Err() + case err := <-errChan: + return err + } } -func (m *client) OpenStream(ctx context.Context, request *v2.DiscoveryRequest) <-chan *Response { - return nil +func (e *UnsupportedResourceError) Error() string { + return fmt.Sprintf("Unsupported resource typeUrl: %s", e.TypeURL) } diff --git a/internal/app/upstream/client_mock.go b/internal/app/upstream/client_mock.go new file mode 100644 index 00000000..b52bf327 --- /dev/null +++ b/internal/app/upstream/client_mock.go @@ -0,0 +1,26 @@ +package upstream + +import ( + "context" + + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/envoyproxy/xds-relay/internal/pkg/log" +) + +// NewMockClient creates a mock implementation for testing +func NewMockClient( + ctx context.Context, + ldsClient v2.ListenerDiscoveryServiceClient, + rdsClient v2.RouteDiscoveryServiceClient, + edsClient v2.EndpointDiscoveryServiceClient, + cdsClient v2.ClusterDiscoveryServiceClient, + callOptions CallOptions) Client { + return &client{ + ldsClient: ldsClient, + rdsClient: rdsClient, + edsClient: edsClient, + cdsClient: cdsClient, + callOptions: callOptions, + logger: log.New("panic"), + } +} diff --git a/internal/app/upstream/client_test.go b/internal/app/upstream/client_test.go new file mode 100644 index 00000000..152cabd9 --- /dev/null +++ b/internal/app/upstream/client_test.go @@ -0,0 +1,207 @@ +package upstream_test + +import ( + "context" + "fmt" + "strconv" + "testing" + "time" + + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + "github.com/envoyproxy/xds-relay/internal/app/upstream" + mock "github.com/envoyproxy/xds-relay/internal/app/upstream/mock" + "github.com/stretchr/testify/assert" +) + +type CallOptions = upstream.CallOptions + +func TestOpenStreamShouldReturnErrorForInvalidTypeUrl(t *testing.T) { + client := createMockClient() + + respCh, _, err := client.OpenStream(v2.DiscoveryRequest{}) + assert.NotNil(t, err) + _, ok := err.(*upstream.UnsupportedResourceError) + assert.True(t, ok) + assert.Nil(t, respCh) +} + +func TestOpenStreamShouldResturnErrorOnStreamCreationFailure(t *testing.T) { + client := createMockClientWithError() + + typeURLs := []string{ + upstream.ListenerTypeURL, + upstream.ClusterTypeURL, + upstream.RouteTypeURL, + upstream.EndpointTypeURL, + } + for _, typeURL := range typeURLs { + t.Run(typeURL, func(t *testing.T) { + respCh, _, err := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: typeURL, + Node: &core.Node{}, + }) + assert.Nil(t, respCh) + assert.NotNil(t, err) + }) + } +} + +func TestOpenStreamShouldReturnNonEmptyResponseChannel(t *testing.T) { + client := createMockClient() + + respCh, done, err := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ListenerTypeURL, + Node: &core.Node{}, + }) + assert.NotNil(t, respCh) + assert.Nil(t, err) + done() +} + +func TestOpenStreamShouldSendTheFirstRequestToOriginServer(t *testing.T) { + var message *v2.DiscoveryRequest + responseChan := make(chan *v2.DiscoveryResponse) + wait := make(chan bool) + client := mock.NewClient( + context.Background(), + CallOptions{Timeout: time.Nanosecond}, + nil, + responseChan, + func(m interface{}) error { + message = m.(*v2.DiscoveryRequest) + wait <- true + return nil + }, + ) + + node := &core.Node{} + _, done, _ := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ListenerTypeURL, + Node: node, + }) + <-wait + assert.NotNil(t, message) + assert.Equal(t, message.GetNode(), node) + assert.Equal(t, message.TypeUrl, upstream.ListenerTypeURL) + done() +} + +func TestOpenStreamShouldSendErrorIfSendFails(t *testing.T) { + responseChan := make(chan *v2.DiscoveryResponse) + sendError := fmt.Errorf("") + client := createMockClientWithReponse(time.Second, responseChan, func(m interface{}) error { + return sendError + }) + + resp, done, _ := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ListenerTypeURL, + Node: &core.Node{}, + }) + _, more := <-resp + assert.False(t, more) + done() +} + +func TestOpenStreamShouldSendTheResponseOnTheChannel(t *testing.T) { + responseChan := make(chan *v2.DiscoveryResponse) + response := &v2.DiscoveryResponse{} + client := createMockClientWithReponse(time.Second, responseChan, func(m interface{}) error { + responseChan <- response + return nil + }) + + resp, done, err := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ListenerTypeURL, + Node: &core.Node{}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + val := <-resp + assert.Equal(t, val, response) + done() +} + +func TestOpenStreamShouldSendTheNextRequestWithUpdatedVersionAndNonce(t *testing.T) { + responseChan := make(chan *v2.DiscoveryResponse) + lastAppliedVersion := "" + index := 0 + client := createMockClientWithReponse(time.Second, responseChan, func(m interface{}) error { + message := m.(*v2.DiscoveryRequest) + + assert.Equal(t, message.GetVersionInfo(), lastAppliedVersion) + assert.Equal(t, message.GetResponseNonce(), lastAppliedVersion) + + response := &v2.DiscoveryResponse{ + VersionInfo: strconv.Itoa(index), + Nonce: strconv.Itoa(index), + TypeUrl: upstream.ListenerTypeURL, + } + lastAppliedVersion = strconv.Itoa(index) + index++ + responseChan <- response + return nil + }) + + resp, done, err := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ListenerTypeURL, + Node: &core.Node{}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + for i := 0; i < 5; i++ { + val := <-resp + assert.Equal(t, val.GetVersionInfo(), strconv.Itoa(i)) + assert.Equal(t, val.GetNonce(), strconv.Itoa(i)) + } + + done() +} + +func TestOpenStreamShouldSendErrorWhenSendMsgBlocks(t *testing.T) { + responseChan := make(chan *v2.DiscoveryResponse) + blockedCtx, cancel := context.WithCancel(context.Background()) + client := createMockClientWithReponse(time.Nanosecond, responseChan, func(m interface{}) error { + // TODO: When stats are available, strengthen the test + // https://github.com/envoyproxy/xds-relay/issues/61 + <-blockedCtx.Done() + return nil + }) + + resp, done, err := client.OpenStream(v2.DiscoveryRequest{ + TypeUrl: upstream.ListenerTypeURL, + Node: &core.Node{}, + }) + assert.Nil(t, err) + assert.NotNil(t, resp) + _, more := <-resp + assert.False(t, more) + + done() + cancel() +} + +func createMockClient() upstream.Client { + return mock.NewClient( + context.Background(), + CallOptions{Timeout: time.Nanosecond}, + nil, + make(chan *v2.DiscoveryResponse), + func(m interface{}) error { return nil }) +} + +func createMockClientWithError() upstream.Client { + return mock.NewClient( + context.Background(), + CallOptions{Timeout: time.Nanosecond}, + fmt.Errorf("error"), + make(chan *v2.DiscoveryResponse), + func(m interface{}) error { return nil }) +} + +func createMockClientWithReponse( + t time.Duration, + r chan *v2.DiscoveryResponse, + sendCb func(m interface{}) error) upstream.Client { + return mock.NewClient(context.Background(), CallOptions{Timeout: t}, nil, r, sendCb) +} diff --git a/internal/app/upstream/mock/client.go b/internal/app/upstream/mock/client.go new file mode 100644 index 00000000..17343ffa --- /dev/null +++ b/internal/app/upstream/mock/client.go @@ -0,0 +1,202 @@ +package upstream + +import ( + "context" + "fmt" + "io" + + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/envoyproxy/xds-relay/internal/app/upstream" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +// NewClient creates a mock implementation for testing +func NewClient( + ctx context.Context, + callOptions upstream.CallOptions, + errorOnCreate error, + receiveChan chan *v2.DiscoveryResponse, + sendCb func(m interface{}) error) upstream.Client { + return upstream.NewMockClient( + ctx, + createMockLdsClient(errorOnCreate, receiveChan, sendCb), + createMockRdsClient(errorOnCreate, receiveChan, sendCb), + createMockEdsClient(errorOnCreate, receiveChan, sendCb), + createMockCdsClient(errorOnCreate, receiveChan, sendCb), + callOptions, + ) +} + +func createMockLdsClient( + errorOnCreate error, + receiveChan chan *v2.DiscoveryResponse, + sendCb func(m interface{}) error) v2.ListenerDiscoveryServiceClient { + return &mockClient{errorOnStreamCreate: errorOnCreate, receiveChan: receiveChan, sendCb: sendCb} +} + +func createMockCdsClient( + errorOnCreate error, + receiveChan chan *v2.DiscoveryResponse, + sendCb func(m interface{}) error) v2.ClusterDiscoveryServiceClient { + return &mockClient{errorOnStreamCreate: errorOnCreate, receiveChan: receiveChan, sendCb: sendCb} +} + +func createMockRdsClient( + errorOnCreate error, + receiveChan chan *v2.DiscoveryResponse, + sendCb func(m interface{}) error) v2.RouteDiscoveryServiceClient { + return &mockClient{errorOnStreamCreate: errorOnCreate, receiveChan: receiveChan, sendCb: sendCb} +} + +func createMockEdsClient( + errorOnCreate error, + receiveChan chan *v2.DiscoveryResponse, + sendCb func(m interface{}) error) v2.EndpointDiscoveryServiceClient { + return &mockClient{errorOnStreamCreate: errorOnCreate, receiveChan: receiveChan, sendCb: sendCb} +} + +type mockClient struct { + errorOnStreamCreate error + receiveChan chan *v2.DiscoveryResponse + sendCb func(m interface{}) error +} + +type mockGrpcStream struct { + ctx context.Context + receiveChan chan *v2.DiscoveryResponse + sendCb func(m interface{}) error +} + +func (stream *mockGrpcStream) SendMsg(m interface{}) error { + return stream.sendCb(m) +} + +func (stream *mockGrpcStream) RecvMsg(m interface{}) error { + for { + select { + // https://github.com/grpc/grpc-go/issues/1894#issuecomment-370487012 + case <-stream.ctx.Done(): + return io.EOF + case resp := <-stream.receiveChan: + message := m.(*v2.DiscoveryResponse) + message.VersionInfo = resp.GetVersionInfo() + message.Nonce = resp.GetNonce() + message.TypeUrl = resp.GetTypeUrl() + message.Resources = resp.GetResources() + return nil + } + } +} + +func (stream *mockGrpcStream) Send(*v2.DiscoveryRequest) error { + return fmt.Errorf("Not implemented") +} + +func (stream *mockGrpcStream) Recv() (*v2.DiscoveryResponse, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (stream *mockGrpcStream) Header() (metadata.MD, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (stream *mockGrpcStream) Trailer() metadata.MD { + return nil +} + +func (stream *mockGrpcStream) CloseSend() error { + return nil +} + +func (stream *mockGrpcStream) Context() context.Context { + return stream.ctx +} + +func (c *mockClient) StreamListeners( + ctx context.Context, + opts ...grpc.CallOption) (v2.ListenerDiscoveryService_StreamListenersClient, error) { + if c.errorOnStreamCreate != nil { + return nil, c.errorOnStreamCreate + } + return &mockGrpcStream{ctx: ctx, receiveChan: c.receiveChan, sendCb: c.sendCb}, nil +} + +func (c *mockClient) StreamClusters( + ctx context.Context, + opts ...grpc.CallOption) (v2.ClusterDiscoveryService_StreamClustersClient, error) { + if c.errorOnStreamCreate != nil { + return nil, c.errorOnStreamCreate + } + return &mockGrpcStream{ctx: ctx, receiveChan: c.receiveChan, sendCb: c.sendCb}, nil +} + +func (c *mockClient) StreamRoutes( + ctx context.Context, + opts ...grpc.CallOption) (v2.RouteDiscoveryService_StreamRoutesClient, error) { + if c.errorOnStreamCreate != nil { + return nil, c.errorOnStreamCreate + } + return &mockGrpcStream{ctx: ctx, receiveChan: c.receiveChan, sendCb: c.sendCb}, nil +} + +func (c *mockClient) StreamEndpoints( + ctx context.Context, + opts ...grpc.CallOption) (v2.EndpointDiscoveryService_StreamEndpointsClient, error) { + if c.errorOnStreamCreate != nil { + return nil, c.errorOnStreamCreate + } + return &mockGrpcStream{ctx: ctx, receiveChan: c.receiveChan, sendCb: c.sendCb}, nil +} + +func (c *mockClient) DeltaListeners( + ctx context.Context, + opts ...grpc.CallOption) (v2.ListenerDiscoveryService_DeltaListenersClient, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (c *mockClient) DeltaClusters( + ctx context.Context, + opts ...grpc.CallOption) (v2.ClusterDiscoveryService_DeltaClustersClient, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (c *mockClient) DeltaRoutes( + ctx context.Context, + opts ...grpc.CallOption) (v2.RouteDiscoveryService_DeltaRoutesClient, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (c *mockClient) DeltaEndpoints( + ctx context.Context, + opts ...grpc.CallOption) (v2.EndpointDiscoveryService_DeltaEndpointsClient, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (c *mockClient) FetchListeners( + ctx context.Context, + in *v2.DiscoveryRequest, + opts ...grpc.CallOption) (*v2.DiscoveryResponse, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (c *mockClient) FetchClusters( + ctx context.Context, + in *v2.DiscoveryRequest, + opts ...grpc.CallOption) (*v2.DiscoveryResponse, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (c *mockClient) FetchRoutes( + ctx context.Context, + in *v2.DiscoveryRequest, + opts ...grpc.CallOption) (*v2.DiscoveryResponse, error) { + return nil, fmt.Errorf("Not implemented") +} + +func (c *mockClient) FetchEndpoints( + ctx context.Context, + in *v2.DiscoveryRequest, + opts ...grpc.CallOption) (*v2.DiscoveryResponse, error) { + return nil, fmt.Errorf("Not implemented") +}