Skip to content

Commit

Permalink
fix goroutine leak in watchMux
Browse files Browse the repository at this point in the history
Signed-off-by: yingjinhui <yingjinhui@didiglobal.com>
  • Loading branch information
ikaven1024 committed Nov 11, 2023
1 parent 1b2c6ed commit 496da7d
Showing 1 changed file with 58 additions and 41 deletions.
99 changes: 58 additions & 41 deletions pkg/search/proxy/store/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package store
import (
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"sort"
"sync"

Expand Down Expand Up @@ -195,9 +197,63 @@ func (w *watchMux) AddSource(watcher watch.Interface, decorator func(watch.Event

// Start run the watcher
func (w *watchMux) Start() {
for _, source := range w.sources {
go w.startWatchSource(source.watcher, source.decorator)
// Build select cases dynamically. The final result is like:
// select {
// case v, ok := <- w.sources[0].watcher.ResultChan():
// ...
// case v, ok := <- w.sources[1].watcher.ResultChan():
// ...
// case v, ok := <- w.sources[2].watcher.ResultChan():
// ...
// ...
// case <- w.done:
// ...
// }
cases := make([]reflect.SelectCase, len(w.sources)+1)
for i, source := range w.sources {
cases[i] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(source.watcher.ResultChan()),
}
}

// add done chan select case at last
doneCaseIndex := len(cases) - 1
cases[doneCaseIndex] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(w.done),
}

go func() {
defer close(w.result)
for {
chosen, val, ok := reflect.Select(cases)
if !ok {
return
}

if chosen == doneCaseIndex {
// Received from done chan.
// In fact, this will never happen
panic(fmt.Sprintf("unexpectedly receive from done chan: %v", val.Interface()))
} else {
// Received from source chan.
sourceEvent := val.Interface().(watch.Event)
// sourceEvent object is cacheObject,all watcher use the same point,must deepcopy.
copyEvent := *sourceEvent.DeepCopy()
if decorator := w.sources[chosen].decorator; decorator != nil {
decorator(copyEvent)
}

select {
case <-w.done:
return
case w.result <- copyEvent:
}
}
}

}()
}

// ResultChan implements watch.Interface
Expand All @@ -220,45 +276,6 @@ func (w *watchMux) Stop() {
case <-w.done:
default:
close(w.done)
close(w.result)
}
}

func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch.Event)) {
defer source.Stop()
defer w.Stop()
for {
var copyEvent watch.Event
select {
case sourceEvent, ok := <-source.ResultChan():
if !ok {
return
}
// sourceEvent object is cacheObject,all watcher use the same point,must deepcopy.
copyEvent = *sourceEvent.DeepCopy()
if decorator != nil {
decorator(copyEvent)
}
case <-w.done:
return
}

select {
case <-w.done:
return
default:
}

func() {
w.lock.RLock()
defer w.lock.RUnlock()
select {
case <-w.done:
return
default:
w.result <- copyEvent
}
}()
}
}

Expand Down

0 comments on commit 496da7d

Please sign in to comment.