Skip to content

Commit

Permalink
[release-1.12] [BugFix] WasmPlugin: Call forwardToEnvoy in the same t…
Browse files Browse the repository at this point in the history
…hread (#39176)

* Call forwardToEnvoy in the same thread

* Update pkg/istio-agent/xds_proxy.go

Co-authored-by: John Howard <howardjohn@google.com>

Co-authored-by: Ingwon Song <igsong@google.com>
Co-authored-by: Ingwon Song <102102227+ingwonsong@users.noreply.github.com>
Co-authored-by: John Howard <howardjohn@google.com>
  • Loading branch information
4 people committed May 27, 2022
1 parent 4dbbd88 commit c904f2f
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions pkg/istio-agent/xds_proxy.go
Expand Up @@ -473,6 +473,7 @@ func (p *XdsProxy) handleUpstreamRequest(con *ProxyConnection) {
}

func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) {
forwardEnvoyCh := make(chan *discovery.DiscoveryResponse, 1)
for {
select {
case resp := <-con.responsesChan:
Expand Down Expand Up @@ -506,7 +507,15 @@ func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) {
case v3.ExtensionConfigurationType:
if features.WasmRemoteLoadConversion {
// If Wasm remote load conversion feature is enabled, rewrite and send.
go p.rewriteAndForward(con, resp)
go p.rewriteAndForward(con, resp, func(resp *discovery.DiscoveryResponse) {
// Forward the response using the thread of `handleUpstreamResponse`
// to prevent concurrent access to forwardToEnvoy
select {
case forwardEnvoyCh <- resp:
proxyLog.Debugf("wasm send response: %v", resp.TypeUrl)
case <-con.stopChan:
}
})
} else {
// Otherwise, forward ECDS resource update directly to Envoy.
forwardToEnvoy(con, resp)
Expand All @@ -518,13 +527,15 @@ func (p *XdsProxy) handleUpstreamResponse(con *ProxyConnection) {
forwardToEnvoy(con, resp)
}
}
case resp := <-forwardEnvoyCh:
forwardToEnvoy(con, resp)
case <-con.stopChan:
return
}
}
}

func (p *XdsProxy) rewriteAndForward(con *ProxyConnection, resp *discovery.DiscoveryResponse) {
func (p *XdsProxy) rewriteAndForward(con *ProxyConnection, resp *discovery.DiscoveryResponse, forward func(resp *discovery.DiscoveryResponse)) {
sendNack := wasm.MaybeConvertWasmExtensionConfig(resp.Resources, p.wasmCache)
if sendNack {
proxyLog.Debugf("sending NACK for ECDS resources %+v", resp.Resources)
Expand All @@ -540,7 +551,7 @@ func (p *XdsProxy) rewriteAndForward(con *ProxyConnection, resp *discovery.Disco
return
}
proxyLog.Debugf("forward ECDS resources %+v", resp.Resources)
forwardToEnvoy(con, resp)
forward(resp)
}

func (p *XdsProxy) forwardToTap(resp *discovery.DiscoveryResponse) {
Expand Down

0 comments on commit c904f2f

Please sign in to comment.