diff --git a/src/core/pipe.go b/src/core/pipe.go index cf60df5ba..39cf5f85e 100644 --- a/src/core/pipe.go +++ b/src/core/pipe.go @@ -25,7 +25,7 @@ type MessagePipe struct { plugins []Plugin ctx context.Context cancel context.CancelFunc - wg sync.WaitGroup + mu sync.Mutex bus messagebus.MessageBus } @@ -36,15 +36,15 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { plugins: make([]Plugin, 0, MaxPlugins), ctx: pipeContext, cancel: pipeCancel, - wg: sync.WaitGroup{}, + mu: sync.Mutex{}, } } func (p *MessagePipe) Register(size int, plugins ...Plugin) error { + p.mu.Lock() + p.plugins = append(p.plugins, plugins...) - p.bus = messagebus.New(size) - p.wg.Add(1) - defer p.wg.Done() + p.bus = messagebus.New(size) for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { @@ -55,6 +55,7 @@ func (p *MessagePipe) Register(size int, plugins ...Plugin) error { } } + p.mu.Unlock() return nil } @@ -74,12 +75,17 @@ func (p *MessagePipe) Run() { for { select { case <-p.ctx.Done(): - p.shutdown() + + for _, r := range p.plugins { + r.Close() + } + close(p.messageChannel) + return case m := <-p.messageChannel: - p.wg.Add(1) - defer p.wg.Done() + p.mu.Lock() p.bus.Publish(m.Topic(), m) + p.mu.Unlock() } } } @@ -101,10 +107,3 @@ func (p *MessagePipe) initPlugins() { r.Init(p) } } - -func (p *MessagePipe) shutdown() { - for _, r := range p.plugins { - r.Close() - } - close(p.messageChannel) -} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go index cf60df5ba..39cf5f85e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -25,7 +25,7 @@ type MessagePipe struct { plugins []Plugin ctx context.Context cancel context.CancelFunc - wg sync.WaitGroup + mu sync.Mutex bus messagebus.MessageBus } @@ -36,15 +36,15 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { plugins: make([]Plugin, 0, MaxPlugins), ctx: pipeContext, cancel: pipeCancel, - wg: sync.WaitGroup{}, + mu: sync.Mutex{}, } } func (p *MessagePipe) Register(size int, plugins ...Plugin) error { + p.mu.Lock() + p.plugins = append(p.plugins, plugins...) - p.bus = messagebus.New(size) - p.wg.Add(1) - defer p.wg.Done() + p.bus = messagebus.New(size) for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { @@ -55,6 +55,7 @@ func (p *MessagePipe) Register(size int, plugins ...Plugin) error { } } + p.mu.Unlock() return nil } @@ -74,12 +75,17 @@ func (p *MessagePipe) Run() { for { select { case <-p.ctx.Done(): - p.shutdown() + + for _, r := range p.plugins { + r.Close() + } + close(p.messageChannel) + return case m := <-p.messageChannel: - p.wg.Add(1) - defer p.wg.Done() + p.mu.Lock() p.bus.Publish(m.Topic(), m) + p.mu.Unlock() } } } @@ -101,10 +107,3 @@ func (p *MessagePipe) initPlugins() { r.Init(p) } } - -func (p *MessagePipe) shutdown() { - for _, r := range p.plugins { - r.Close() - } - close(p.messageChannel) -}