Skip to content

Commit

Permalink
Merge master (#2)
Browse files Browse the repository at this point in the history
* Don't extend TTL for AddWatch (envoyproxy#60)

Signed-off-by: Jyoti Mahapatra <jmahapatra@lyft.com>
Signed-off-by: Jess Yuen <jyuen@lyft.com>
  • Loading branch information
jyotimahapatra authored and Jess Yuen committed Apr 24, 2020
1 parent 2f15c34 commit cc6284e
Show file tree
Hide file tree
Showing 10 changed files with 720 additions and 106 deletions.
13 changes: 0 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,16 @@ github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334 h1:VHgatEHNcBFEB7inlalqfNqw65aNkM1lGX2yt3NmbS8=
github.com/iancoleman/strcase v0.0.0-20191112232945-16388991a334/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lyft/protoc-gen-star v0.4.14 h1:HUkD4H4dYFIgu3Bns/3N6J5GmKHCEGnhYBwNu3fvXgA=
github.com/lyft/protoc-gen-star v0.4.14/go.mod h1:mE8fbna26u7aEA2QCVvvfBU/ZrPgocG1206xAFPcs94=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand All @@ -109,7 +105,6 @@ github.com/onsi/gomega v1.9.0 h1:R1uwffexN6Pr340GtYRIdZmAiN4J+iw6WG4wog1DUXg=
github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand All @@ -131,7 +126,6 @@ github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPx
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.6 h1:breEStsVwemnKh2/s6gMvSdMEkwW0sK8vGStnlVBMCs=
Expand All @@ -157,7 +151,6 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.14.0 h1:/pduUoebOeeJzTDFuoMgC6nRkiasr1sBCIEorly7m4o=
Expand All @@ -169,7 +162,6 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -207,9 +199,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand All @@ -221,7 +211,6 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.1 h1:zvIju4sqAGvwKspUQOhwnpcqSbzi7/H6QomNNjTL4sk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
Expand All @@ -233,7 +222,6 @@ google.golang.org/protobuf v1.20.1 h1:ESRXHgpUBG5D2I5mmsQIyYxB/tQIZfSZ8wLyFDf/N/
google.golang.org/protobuf v1.20.1/go.mod h1:KqelGeouBkcbcuB3HCk4/YH2tmNLk6YSWA5LIWeI/lY=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
Expand All @@ -248,5 +236,4 @@ gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
1 change: 0 additions & 1 deletion internal/app/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (c *cache) AddRequest(key string, req *v2.DiscoveryRequest) error {
return fmt.Errorf("unable to cast cache value to type resource for key: %s", key)
}
resource.Requests = append(resource.Requests, req)
resource.ExpirationTime = c.getExpirationTime(time.Now())
c.cache.Add(key, resource)
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions internal/app/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (o *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func())
// streams.
o.upstreamResponseMap.mu.Lock()
if _, ok := o.upstreamResponseMap.responseChannel[aggregatedKey]; !ok {
upstreamResponseChan := o.upstreamClient.OpenStream(ctx, &req)
upstreamResponseChan, _, _ := o.upstreamClient.OpenStream(req)
respChannel := o.upstreamResponseMap.add(aggregatedKey, upstreamResponseChan)
// Spin up a go routine to watch for upstream responses.
// One routine is opened per aggregate key.
Expand All @@ -187,24 +187,24 @@ func (o *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (*gcp.
func (o *orchestrator) watchUpstream(
ctx context.Context,
aggregatedKey string,
responseChannel <-chan *upstream.Response,
responseChannel <-chan *discovery.DiscoveryResponse,
done <-chan bool,
) {
for {
select {
case x := <-responseChannel:
if x.Err != nil {
case x, more := <-responseChannel:
if !more {
// A problem occurred fetching the response upstream, log and
// return the most recent cached response, so that the
// downstream will reissue the discovery request.
o.logger.With("err", x.Err).With("key", aggregatedKey).Error(ctx, "upstream error")
o.logger.With("key", aggregatedKey).Error(ctx, "upstream error")
} else {
// Cache the response.
_, err := o.cache.SetResponse(aggregatedKey, x.Response)
_, err := o.cache.SetResponse(aggregatedKey, *x)
if err != nil {
// If we fail to cache the new response, log and return the old one.
o.logger.With("err", err).With("key", aggregatedKey).
With("resp", x.Response).Error(ctx, "Failed to cache the response")
Error(ctx, "Failed to cache the response")
}
}

Expand All @@ -220,7 +220,7 @@ func (o *orchestrator) watchUpstream(
} else {
if cached == nil || cached.Resp == nil {
// If cache is empty, there is nothing to fan out.
if x.Err != nil {
if !more {
// Warn. Benefit of the doubt that this is the first request.
o.logger.With("key", aggregatedKey).
Warn(ctx, "attempted to fan out with no cached response")
Expand Down
109 changes: 53 additions & 56 deletions internal/app/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,41 @@ import (
"github.com/envoyproxy/xds-relay/internal/app/cache"
"github.com/envoyproxy/xds-relay/internal/app/mapper"
"github.com/envoyproxy/xds-relay/internal/app/upstream"
upstream_mock "github.com/envoyproxy/xds-relay/internal/app/upstream/mock"
"github.com/envoyproxy/xds-relay/internal/pkg/log"
yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util"
aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1"
bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1"
)

type mockSimpleUpstreamClient struct {
responseChan <-chan *upstream.Response
responseChan <-chan *v2.DiscoveryResponse
}

func (m mockSimpleUpstreamClient) OpenStream(ctx context.Context, req *v2.DiscoveryRequest) <-chan *upstream.Response {
return m.responseChan
func (m mockSimpleUpstreamClient) OpenStream(req v2.DiscoveryRequest) (<-chan *v2.DiscoveryResponse, func(), error) {
return m.responseChan, func() {}, nil
}

type mockMultiStreamUpstreamClient struct {
ldsResponseChan <-chan *upstream.Response
cdsResponseChan <-chan *upstream.Response
ldsResponseChan <-chan *v2.DiscoveryResponse
cdsResponseChan <-chan *v2.DiscoveryResponse

t *testing.T
mapper mapper.Mapper
}

func (m mockMultiStreamUpstreamClient) OpenStream(ctx context.Context,
req *v2.DiscoveryRequest) <-chan *upstream.Response {
aggregatedKey, err := m.mapper.GetKey(*req)
func (m mockMultiStreamUpstreamClient) OpenStream(req v2.DiscoveryRequest) (<-chan *v2.DiscoveryResponse, func(), error) {
aggregatedKey, err := m.mapper.GetKey(req)
assert.NoError(m.t, err)

if aggregatedKey == "lds" {
return m.ldsResponseChan
return m.ldsResponseChan, func() {}, nil
} else if aggregatedKey == "cds" {
return m.cdsResponseChan
return m.cdsResponseChan, func() {}, nil
}

m.t.Errorf("Unsupported aggregated key, %s", aggregatedKey)
return nil
return nil, func() {}, nil
}

func newMockOrchestrator(t *testing.T, mapper mapper.Mapper, upstreamClient upstream.Client) *orchestrator {
Expand Down Expand Up @@ -96,8 +96,12 @@ func assertEqualResources(t *testing.T, got gcp.Response, expected v2.DiscoveryR

func TestNew(t *testing.T) {
// Trivial test to ensure orchestrator instantiates.
upstreamClient, err := upstream.NewClient(context.Background(), "example.com")
assert.NoError(t, err)
upstreamClient := upstream_mock.NewClient(
context.Background(),
upstream.CallOptions{},
nil,
nil,
func(m interface{}) error { return nil })

config := aggregationv1.KeyerConfiguration{
Fragments: []*aggregationv1.KeyerConfiguration_Fragment{
Expand All @@ -120,7 +124,7 @@ func TestNew(t *testing.T) {
}

func TestGoldenPath(t *testing.T) {
upstreamResponseChannel := make(chan *upstream.Response)
upstreamResponseChannel := make(chan *v2.DiscoveryResponse)
mapper := newMockMapper(t)
orchestrator := newMockOrchestrator(
t,
Expand All @@ -140,21 +144,19 @@ func TestGoldenPath(t *testing.T) {
assert.Equal(t, 1, len(orchestrator.downstreamResponseMap.responseChannel))
assert.Equal(t, 1, len(orchestrator.upstreamResponseMap.responseChannel))

upstreamResponse := upstream.Response{
Response: v2.DiscoveryResponse{
VersionInfo: "1",
TypeUrl: "type.googleapis.com/envoy.api.v2.Listener",
Resources: []*any.Any{
&anypb.Any{
Value: []byte("lds resource"),
},
resp := v2.DiscoveryResponse{
VersionInfo: "1",
TypeUrl: "type.googleapis.com/envoy.api.v2.Listener",
Resources: []*any.Any{
&anypb.Any{
Value: []byte("lds resource"),
},
},
}
upstreamResponseChannel <- &upstreamResponse
upstreamResponseChannel <- &resp

gotResponse := <-respChannel
assertEqualResources(t, gotResponse, upstreamResponse.Response, req)
assertEqualResources(t, gotResponse, resp, req)

aggregatedKey, err := mapper.GetKey(req)
assert.NoError(t, err)
Expand All @@ -166,7 +168,7 @@ func TestGoldenPath(t *testing.T) {
}

func TestCachedResponse(t *testing.T) {
upstreamResponseChannel := make(chan *upstream.Response)
upstreamResponseChannel := make(chan *v2.DiscoveryResponse)
mapper := newMockMapper(t)
orchestrator := newMockOrchestrator(
t,
Expand Down Expand Up @@ -208,20 +210,19 @@ func TestCachedResponse(t *testing.T) {
assertEqualResources(t, gotResponse, mockResponse, req)

// Attempt pushing a more recent response from upstream.
upstreamResponse := upstream.Response{
Response: v2.DiscoveryResponse{
VersionInfo: "2",
TypeUrl: "type.googleapis.com/envoy.api.v2.Listener",
Resources: []*any.Any{
&anypb.Any{
Value: []byte("some other lds resource"),
},
resp := v2.DiscoveryResponse{
VersionInfo: "2",
TypeUrl: "type.googleapis.com/envoy.api.v2.Listener",
Resources: []*any.Any{
&anypb.Any{
Value: []byte("some other lds resource"),
},
},
}
upstreamResponseChannel <- &upstreamResponse

upstreamResponseChannel <- &resp
gotResponse = <-respChannel
assertEqualResources(t, gotResponse, upstreamResponse.Response, req)
assertEqualResources(t, gotResponse, resp, req)
assert.Equal(t, 1, len(orchestrator.upstreamResponseMap.responseChannel))

// Test scenario with same request and response version.
Expand All @@ -247,8 +248,8 @@ func TestCachedResponse(t *testing.T) {
}

func TestMultipleWatchesAndUpstreams(t *testing.T) {
upstreamResponseChannelLDS := make(chan *upstream.Response)
upstreamResponseChannelCDS := make(chan *upstream.Response)
upstreamResponseChannelLDS := make(chan *v2.DiscoveryResponse)
upstreamResponseChannelCDS := make(chan *v2.DiscoveryResponse)
mapper := newMockMapper(t)
orchestrator := newMockOrchestrator(
t,
Expand Down Expand Up @@ -279,25 +280,21 @@ func TestMultipleWatchesAndUpstreams(t *testing.T) {
respChannel3, cancelWatch3 := orchestrator.CreateWatch(req3)
assert.NotNil(t, respChannel3)

upstreamResponseLDS := upstream.Response{
Response: v2.DiscoveryResponse{
VersionInfo: "1",
TypeUrl: "type.googleapis.com/envoy.api.v2.Listener",
Resources: []*any.Any{
&anypb.Any{
Value: []byte("lds resource"),
},
upstreamResponseLDS := v2.DiscoveryResponse{
VersionInfo: "1",
TypeUrl: "type.googleapis.com/envoy.api.v2.Listener",
Resources: []*any.Any{
&anypb.Any{
Value: []byte("lds resource"),
},
},
}
upstreamResponseCDS := upstream.Response{
Response: v2.DiscoveryResponse{
VersionInfo: "1",
TypeUrl: "type.googleapis.com/envoy.api.v2.Cluster",
Resources: []*any.Any{
&anypb.Any{
Value: []byte("cds resource"),
},
upstreamResponseCDS := v2.DiscoveryResponse{
VersionInfo: "1",
TypeUrl: "type.googleapis.com/envoy.api.v2.Cluster",
Resources: []*any.Any{
&anypb.Any{
Value: []byte("cds resource"),
},
},
}
Expand All @@ -317,9 +314,9 @@ func TestMultipleWatchesAndUpstreams(t *testing.T) {
assert.Equal(t, 3, len(orchestrator.downstreamResponseMap.responseChannel))
assert.Equal(t, 2, len(orchestrator.upstreamResponseMap.responseChannel))

assertEqualResources(t, gotResponseFromChannel1, upstreamResponseLDS.Response, req1)
assertEqualResources(t, gotResponseFromChannel2, upstreamResponseLDS.Response, req2)
assertEqualResources(t, gotResponseFromChannel3, upstreamResponseCDS.Response, req3)
assertEqualResources(t, gotResponseFromChannel1, upstreamResponseLDS, req1)
assertEqualResources(t, gotResponseFromChannel2, upstreamResponseLDS, req2)
assertEqualResources(t, gotResponseFromChannel3, upstreamResponseCDS, req3)

orchestrator.shutdown(aggregatedKeyLDS)
orchestrator.shutdown(aggregatedKeyCDS)
Expand Down
6 changes: 3 additions & 3 deletions internal/app/orchestrator/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package orchestrator
import (
"sync"

"github.com/envoyproxy/xds-relay/internal/app/upstream"
discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
)

// upstream is the map of aggregate key to the receive-only upstream
Expand All @@ -14,7 +14,7 @@ type upstreamResponseMap struct {
}

type upstreamResponseChannel struct {
response <-chan *upstream.Response
response <-chan *discovery.DiscoveryResponse
done chan bool
}

Expand All @@ -24,7 +24,7 @@ type upstreamResponseChannel struct {
// Expects orchestrator to manage the lock since this is called simultaneously
// with stream open.
func (u *upstreamResponseMap) add(aggregatedKey string,
responseChannel <-chan *upstream.Response) upstreamResponseChannel {
responseChannel <-chan *discovery.DiscoveryResponse) upstreamResponseChannel {
channel := upstreamResponseChannel{
response: responseChannel,
done: make(chan bool, 1),
Expand Down
Loading

0 comments on commit cc6284e

Please sign in to comment.