Skip to content

Commit

Permalink
Backport adsc race condition fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
howardjohn committed Oct 17, 2019
1 parent b9e340b commit a6f6397
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 60 deletions.
24 changes: 12 additions & 12 deletions pilot/pkg/proxy/envoy/v2/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func TestEds(t *testing.T) {
})
t.Run("CDSSave", func(t *testing.T) {
// Moved from cds_test, using new client
if len(adsc.Clusters) == 0 {
if len(adsc.GetClusters()) == 0 {
t.Error("No clusters in ADS response")
}
strResponse, _ := json.MarshalIndent(adsc.Clusters, " ", " ")
strResponse, _ := json.MarshalIndent(adsc.GetClusters(), " ", " ")
_ = ioutil.WriteFile(env.IstioOut+"/cdsv2_sidecar.json", strResponse, 0644)

})
Expand Down Expand Up @@ -136,7 +136,7 @@ func adsConnectAndWait(t *testing.T, ip int) *adsc.ADSC {
t.Fatal("Error getting initial config ", err)
}

if len(adsc.EDS) == 0 {
if len(adsc.GetEndpoints()) == 0 {
t.Fatal("No endpoints")
}
return adsc
Expand Down Expand Up @@ -189,7 +189,7 @@ func testTCPEndpoints(expected string, adsc *adsc.ADSC, t *testing.T) {
// Verify server sends the endpoint. This check for a single endpoint with the given
// address.
func testEndpoints(expected string, cluster string, adsc *adsc.ADSC, t *testing.T) {
lbe, f := adsc.EDS[cluster]
lbe, f := adsc.GetEndpoints()[cluster]
if !f || len(lbe.Endpoints) == 0 {
t.Fatalf("No lb endpoints for %v, %v", cluster, adsc.EndpointsJSON())
}
Expand All @@ -210,12 +210,12 @@ func testEndpoints(expected string, cluster string, adsc *adsc.ADSC, t *testing.
}

func testLocalityPrioritizedEndpoints(adsc *adsc.ADSC, adsc2 *adsc.ADSC, t *testing.T) {
verifyLocalityPriorities(asdcLocality, adsc.EDS["outbound|80||locality.cluster.local"].GetEndpoints(), t)
verifyLocalityPriorities(asdc2Locality, adsc2.EDS["outbound|80||locality.cluster.local"].GetEndpoints(), t)
verifyLocalityPriorities(asdcLocality, adsc.GetEndpoints()["outbound|80||locality.cluster.local"].GetEndpoints(), t)
verifyLocalityPriorities(asdc2Locality, adsc2.GetEndpoints()["outbound|80||locality.cluster.local"].GetEndpoints(), t)

// No outlier detection specified for this cluster, so we shouldn't apply priority.
verifyNoLocalityPriorities(adsc.EDS["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t)
verifyNoLocalityPriorities(adsc2.EDS["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t)
verifyNoLocalityPriorities(adsc.GetEndpoints()["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t)
verifyNoLocalityPriorities(adsc2.GetEndpoints()["outbound|80||locality-no-outlier-detection.cluster.local"].GetEndpoints(), t)
}

// Tests that Services with multiple ports sharing the same port number are properly sent endpoints.
Expand Down Expand Up @@ -270,9 +270,9 @@ func verifyLocalityPriorities(proxyLocality string, eps []endpoint.LocalityLbEnd
// Verify server sends UDS endpoints
func testUdsEndpoints(_ *bootstrap.Server, adsc *adsc.ADSC, t *testing.T) {
// Check the UDS endpoint ( used to be separate test - but using old unused GRPC method)
// The new test also verifies CDS is pusing the UDS cluster, since adsc.EDS is
// The new test also verifies CDS is pusing the UDS cluster, since adsc.GetEndpoints() is
// populated using CDS response
lbe, f := adsc.EDS["outbound|0||localuds.cluster.local"]
lbe, f := adsc.GetEndpoints()["outbound|0||localuds.cluster.local"]
if !f || len(lbe.Endpoints) == 0 {
t.Error("No UDS lb endpoints")
} else {
Expand Down Expand Up @@ -414,7 +414,7 @@ func edsUpdateInc(server *bootstrap.Server, adsc *adsc.ADSC, t *testing.T) {
t.Fatal("Expecting EDS update as part of a partial push", err, upd)
}

lbe := adsc.EDS["outbound|8080||eds.test.svc.cluster.local"]
lbe := adsc.GetEndpoints()["outbound|8080||eds.test.svc.cluster.local"]
if len(lbe.Endpoints) != 0 {
t.Fatalf("There should be no endpoints for outbound|8080||eds.test.svc.cluster.local. Endpoints:\n%v", adsc.EndpointsJSON())
}
Expand Down Expand Up @@ -469,7 +469,7 @@ func multipleRequest(server *bootstrap.Server, inc bool, nclients,
return
}

if len(adsc.EDS) == 0 {
if len(adsc.GetEndpoints()) == 0 {
errChan <- errors.New("no endpoints")
wgConnect.Done()
return
Expand Down
42 changes: 21 additions & 21 deletions pilot/pkg/proxy/envoy/v2/lds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,18 @@ func TestLDSIsolated(t *testing.T) {
// 7071 (inbound), 2001 (service - also as http proxy), 15002 (http-proxy)
// We dont get mixer on 9091 or 15004 because there are no services defined in istio-system namespace
// in the none.yaml setup
if len(ldsr.HTTPListeners) != 3 {
if len(ldsr.GetHTTPListeners()) != 3 {
// TODO: we are still debating if for HTTP services we have any use case to create a 127.0.0.1:port outbound
// for the service (the http proxy is already covering this)
t.Error("HTTP listeners, expecting 5 got ", len(ldsr.HTTPListeners), ldsr.HTTPListeners)
t.Error("HTTP listeners, expecting 5 got ", len(ldsr.GetHTTPListeners()), ldsr.GetHTTPListeners())
}

// s1tcp:2000 outbound, bind=true (to reach other instances of the service)
// s1:5005 outbound, bind=true
// :443 - https external, bind=false
// 10.11.0.1_7070, bind=true -> inbound|2000|s1 - on port 7070, fwd to 37070
// virtual
if len(ldsr.TCPListeners) == 0 {
if len(ldsr.GetTCPListeners()) == 0 {
t.Fatal("No response")
}

Expand Down Expand Up @@ -240,21 +240,21 @@ func TestLDSWithDefaultSidecar(t *testing.T) {

// Expect 6 listeners : 1 orig_dst, 1 http inbound + 4 outbound (http, tcp1, istio-policy and istio-telemetry)
// plus 2 extra due to the mem registry
if (len(adsResponse.HTTPListeners) + len(adsResponse.TCPListeners)) != 6 {
t.Fatalf("Expected 8 listeners, got %d\n", len(adsResponse.HTTPListeners)+len(adsResponse.TCPListeners))
if (len(adsResponse.GetHTTPListeners()) + len(adsResponse.GetTCPListeners())) != 6 {
t.Fatalf("Expected 8 listeners, got %d\n", len(adsResponse.GetHTTPListeners())+len(adsResponse.GetTCPListeners()))
}

// Expect 10 CDS clusters: 1 inbound + 7 outbound (2 http services, 1 tcp service, 2 istio-system services,
// and 2 subsets of http1), 1 blackhole, 1 passthrough
// plus 2 extra due to the mem registry
if (len(adsResponse.Clusters) + len(adsResponse.EDSClusters)) != 10 {
t.Fatalf("Expected 12 Clusters in CDS output. Got %d", len(adsResponse.Clusters)+len(adsResponse.EDSClusters))
if (len(adsResponse.GetClusters()) + len(adsResponse.GetEdsClusters())) != 10 {
t.Fatalf("Expected 12 Clusters in CDS output. Got %d", len(adsResponse.GetClusters())+len(adsResponse.GetEdsClusters()))
}

// Expect two vhost blocks in RDS output for 8080 (one for http1, another for http2)
// plus one extra due to mem registry
if len(adsResponse.Routes["8080"].VirtualHosts) != 3 {
t.Fatalf("Expected two VirtualHosts in RDS output. Got %d", len(adsResponse.Routes["8080"].VirtualHosts))
if len(adsResponse.GetRoutes()["8080"].VirtualHosts) != 3 {
t.Fatalf("Expected two VirtualHosts in RDS output. Got %d", len(adsResponse.GetRoutes()["8080"].VirtualHosts))
}
}

Expand Down Expand Up @@ -304,13 +304,13 @@ func TestLDSWithIngressGateway(t *testing.T) {

// Expect 2 listeners : 1 for 80, 1 for 443
// where 443 listener has 3 filter chains
if (len(adsResponse.HTTPListeners) + len(adsResponse.TCPListeners)) != 2 {
t.Fatalf("Expected 2 listeners, got %d\n", len(adsResponse.HTTPListeners)+len(adsResponse.TCPListeners))
if (len(adsResponse.GetHTTPListeners()) + len(adsResponse.GetTCPListeners())) != 2 {
t.Fatalf("Expected 2 listeners, got %d\n", len(adsResponse.GetHTTPListeners())+len(adsResponse.GetTCPListeners()))
}

// TODO: This is flimsy. The ADSC code treats any listener with http connection manager as a HTTP listener
// instead of looking at it as a listener with multiple filter chains
l := adsResponse.HTTPListeners["0.0.0.0_443"]
l := adsResponse.GetHTTPListeners()["0.0.0.0_443"]

if l != nil {
if len(l.FilterChains) != 3 {
Expand Down Expand Up @@ -429,13 +429,13 @@ func TestLDSWithSidecarForWorkloadWithoutService(t *testing.T) {
}

// Expect 1 HTTP listeners for 8081
if len(adsResponse.HTTPListeners) != 1 {
t.Fatalf("Expected 1 http listeners, got %d", len(adsResponse.HTTPListeners))
if len(adsResponse.GetHTTPListeners()) != 1 {
t.Fatalf("Expected 1 http listeners, got %d", len(adsResponse.GetHTTPListeners()))
}

// TODO: This is flimsy. The ADSC code treats any listener with http connection manager as a HTTP listener
// instead of looking at it as a listener with multiple filter chains
if l := adsResponse.HTTPListeners["0.0.0.0_8081"]; l != nil {
if l := adsResponse.GetHTTPListeners()["0.0.0.0_8081"]; l != nil {
expected := 1
if pilot.RestrictPodIPTrafficLoops.Get() {
expected = 2
Expand All @@ -448,10 +448,10 @@ func TestLDSWithSidecarForWorkloadWithoutService(t *testing.T) {
}

// Expect only one EDS cluster for http1.ns1.svc.cluster.local
if len(adsResponse.EDSClusters) != 1 {
t.Fatalf("Expected 1 eds cluster, got %d", len(adsResponse.EDSClusters))
if len(adsResponse.GetEdsClusters()) != 1 {
t.Fatalf("Expected 1 eds cluster, got %d", len(adsResponse.GetEdsClusters()))
}
if cluster, ok := adsResponse.EDSClusters["outbound|8081||http1.ns1.svc.cluster.local"]; !ok {
if cluster, ok := adsResponse.GetEdsClusters()["outbound|8081||http1.ns1.svc.cluster.local"]; !ok {
t.Fatalf("Expected EDS cluster outbound|8081||http1.ns1.svc.cluster.local, got %v", cluster.Name)
}
}
Expand Down Expand Up @@ -530,12 +530,12 @@ func TestLDSEnvoyFilterWithWorkloadSelector(t *testing.T) {
}

// Expect 1 HTTP listeners for 8081
if len(adsResponse.HTTPListeners) != 1 {
t.Fatalf("Expected 1 http listeners, got %d", len(adsResponse.HTTPListeners))
if len(adsResponse.GetHTTPListeners()) != 1 {
t.Fatalf("Expected 1 http listeners, got %d", len(adsResponse.GetHTTPListeners()))
}
// TODO: This is flimsy. The ADSC code treats any listener with http connection manager as a HTTP listener
// instead of looking at it as a listener with multiple filter chains
l := adsResponse.HTTPListeners["0.0.0.0_8081"]
l := adsResponse.GetHTTPListeners()["0.0.0.0_8081"]

expectLuaFilter(t, l, test.expectLuaFilter)
})
Expand Down
93 changes: 67 additions & 26 deletions pkg/adsc/adsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,23 @@ type ADSC struct {
// InitialLoad tracks the time to receive the initial configuration.
InitialLoad time.Duration

// HTTPListeners contains received listeners with a http_connection_manager filter.
HTTPListeners map[string]*xdsapi.Listener
// httpListeners contains received listeners with a http_connection_manager filter.
httpListeners map[string]*xdsapi.Listener

// TCPListeners contains all listeners of type TCP (not-HTTP)
TCPListeners map[string]*xdsapi.Listener
// tcpListeners contains all listeners of type TCP (not-HTTP)
tcpListeners map[string]*xdsapi.Listener

// All received clusters of type EDS, keyed by name
EDSClusters map[string]*xdsapi.Cluster
// All received clusters of type eds, keyed by name
edsClusters map[string]*xdsapi.Cluster

// All received clusters of no-EDS type, keyed by name
Clusters map[string]*xdsapi.Cluster
// All received clusters of no-eds type, keyed by name
clusters map[string]*xdsapi.Cluster

// All received routes, keyed by route name
Routes map[string]*xdsapi.RouteConfiguration
routes map[string]*xdsapi.RouteConfiguration

// All received endpoints, keyed by cluster name
EDS map[string]*xdsapi.ClusterLoadAssignment
eds map[string]*xdsapi.ClusterLoadAssignment

// DumpCfg will print all received config
DumpCfg bool
Expand Down Expand Up @@ -210,9 +210,6 @@ func tlsConfig(certDir string) (*tls.Config, error) {
// Close the stream.
func (a *ADSC) Close() {
a.mutex.Lock()
if a.stream != nil {
_ = a.stream.CloseSend()
}
a.conn.Close()
a.mutex.Unlock()
}
Expand Down Expand Up @@ -367,8 +364,8 @@ func (a *ADSC) handleLDS(ll []*xdsapi.Listener) {
if len(routes) > 0 {
a.sendRsc(routeType, routes)
}
a.HTTPListeners = lh
a.TCPListeners = lt
a.httpListeners = lh
a.tcpListeners = lt

select {
case a.Updates <- "lds":
Expand Down Expand Up @@ -406,47 +403,49 @@ type Endpoint struct {

// Save will save the json configs to files, using the base directory
func (a *ADSC) Save(base string) error {
strResponse, err := json.MarshalIndent(a.TCPListeners, " ", " ")
a.mutex.Lock()
defer a.mutex.Unlock()
strResponse, err := json.MarshalIndent(a.tcpListeners, " ", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(base+"_lds_tcp.json", strResponse, 0644)
if err != nil {
return err
}
strResponse, err = json.MarshalIndent(a.HTTPListeners, " ", " ")
strResponse, err = json.MarshalIndent(a.httpListeners, " ", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(base+"_lds_http.json", strResponse, 0644)
if err != nil {
return err
}
strResponse, err = json.MarshalIndent(a.Routes, " ", " ")
strResponse, err = json.MarshalIndent(a.routes, " ", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(base+"_rds.json", strResponse, 0644)
if err != nil {
return err
}
strResponse, err = json.MarshalIndent(a.EDSClusters, " ", " ")
strResponse, err = json.MarshalIndent(a.edsClusters, " ", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(base+"_ecds.json", strResponse, 0644)
if err != nil {
return err
}
strResponse, err = json.MarshalIndent(a.Clusters, " ", " ")
strResponse, err = json.MarshalIndent(a.clusters, " ", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(base+"_cds.json", strResponse, 0644)
if err != nil {
return err
}
strResponse, err = json.MarshalIndent(a.EDS, " ", " ")
strResponse, err = json.MarshalIndent(a.eds, " ", " ")
if err != nil {
return err
}
Expand Down Expand Up @@ -489,8 +488,8 @@ func (a *ADSC) handleCDS(ll []*xdsapi.Cluster) {

a.mutex.Lock()
defer a.mutex.Unlock()
a.EDSClusters = edscds
a.Clusters = cds
a.edsClusters = edscds
a.clusters = cds

select {
case a.Updates <- "cds":
Expand Down Expand Up @@ -552,7 +551,7 @@ func (a *ADSC) handleEDS(eds []*xdsapi.ClusterLoadAssignment) {

a.mutex.Lock()
defer a.mutex.Unlock()
a.EDS = la
a.eds = la

select {
case a.Updates <- "eds":
Expand Down Expand Up @@ -593,7 +592,7 @@ func (a *ADSC) handleRDS(configurations []*xdsapi.RouteConfiguration) {
}

a.mutex.Lock()
a.Routes = rds
a.routes = rds
a.mutex.Unlock()

select {
Expand Down Expand Up @@ -633,7 +632,7 @@ func (a *ADSC) Wait(update string, to time.Duration) (string, error) {

// EndpointsJSON returns the endpoints, formatted as JSON, for debugging.
func (a *ADSC) EndpointsJSON() string {
out, _ := json.MarshalIndent(a.EDS, " ", " ")
out, _ := json.MarshalIndent(a.eds, " ", " ")
return string(out)
}

Expand Down Expand Up @@ -665,3 +664,45 @@ func (a *ADSC) ack(msg *xdsapi.DiscoveryResponse) {
VersionInfo: msg.VersionInfo,
})
}

// GetHTTPListeners returns all the http listeners.
func (a *ADSC) GetHTTPListeners() map[string]*xdsapi.Listener {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.httpListeners
}

// GetTCPListeners returns all the tcp listeners.
func (a *ADSC) GetTCPListeners() map[string]*xdsapi.Listener {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.tcpListeners
}

// GetEdsClusters returns all the eds type clusters.
func (a *ADSC) GetEdsClusters() map[string]*xdsapi.Cluster {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.edsClusters
}

// GetClusters returns all the non-eds type clusters.
func (a *ADSC) GetClusters() map[string]*xdsapi.Cluster {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.clusters
}

// GetRoutes returns all the routes.
func (a *ADSC) GetRoutes() map[string]*xdsapi.RouteConfiguration {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.routes
}

// GetEndpoints returns all the routes.
func (a *ADSC) GetEndpoints() map[string]*xdsapi.ClusterLoadAssignment {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.eds
}
2 changes: 1 addition & 1 deletion tests/e2e/tests/pilot/performance/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func adsConnectAndWait(n int, pilotAddr string, t *testing.T) (adscs []*adsc.ADS
t.Fatal(err)
}

if len(c.EDS) == 0 {
if len(c.GetEndpoints()) == 0 {
t.Fatalf("No endpoints")
}
adscs = append(adscs, c)
Expand Down

0 comments on commit a6f6397

Please sign in to comment.