From dd00a51a46d1092ac64a190a2ade8a6fa6481f5a Mon Sep 17 00:00:00 2001 From: "o.omahony" Date: Thu, 20 Oct 2022 10:18:42 +0100 Subject: [PATCH 1/2] Move wg to lock --- src/core/pipe.go | 27 +++++++++---------- .../nginx/agent/v2/src/core/pipe.go | 27 +++++++++---------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/src/core/pipe.go b/src/core/pipe.go index cf60df5ba..986e67cac 100644 --- a/src/core/pipe.go +++ b/src/core/pipe.go @@ -25,7 +25,7 @@ type MessagePipe struct { plugins []Plugin ctx context.Context cancel context.CancelFunc - wg sync.WaitGroup + mu sync.Mutex bus messagebus.MessageBus } @@ -36,15 +36,16 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { plugins: make([]Plugin, 0, MaxPlugins), ctx: pipeContext, cancel: pipeCancel, - wg: sync.WaitGroup{}, + mu: sync.Mutex{}, } } func (p *MessagePipe) Register(size int, plugins ...Plugin) error { p.plugins = append(p.plugins, plugins...) p.bus = messagebus.New(size) - p.wg.Add(1) - defer p.wg.Done() + + p.mu.Lock() + defer p.mu.Unlock() for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { @@ -74,11 +75,16 @@ func (p *MessagePipe) Run() { for { select { case <-p.ctx.Done(): - p.shutdown() + + for _, r := range p.plugins { + r.Close() + } + close(p.messageChannel) + return case m := <-p.messageChannel: - p.wg.Add(1) - defer p.wg.Done() + p.mu.Lock() + defer p.mu.Unlock() p.bus.Publish(m.Topic(), m) } } @@ -101,10 +107,3 @@ func (p *MessagePipe) initPlugins() { r.Init(p) } } - -func (p *MessagePipe) shutdown() { - for _, r := range p.plugins { - r.Close() - } - close(p.messageChannel) -} diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go index cf60df5ba..986e67cac 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -25,7 +25,7 @@ type MessagePipe struct { plugins []Plugin ctx context.Context cancel context.CancelFunc - wg sync.WaitGroup + mu sync.Mutex bus messagebus.MessageBus } @@ -36,15 +36,16 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { plugins: make([]Plugin, 0, MaxPlugins), ctx: pipeContext, cancel: pipeCancel, - wg: sync.WaitGroup{}, + mu: sync.Mutex{}, } } func (p *MessagePipe) Register(size int, plugins ...Plugin) error { p.plugins = append(p.plugins, plugins...) p.bus = messagebus.New(size) - p.wg.Add(1) - defer p.wg.Done() + + p.mu.Lock() + defer p.mu.Unlock() for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { @@ -74,11 +75,16 @@ func (p *MessagePipe) Run() { for { select { case <-p.ctx.Done(): - p.shutdown() + + for _, r := range p.plugins { + r.Close() + } + close(p.messageChannel) + return case m := <-p.messageChannel: - p.wg.Add(1) - defer p.wg.Done() + p.mu.Lock() + defer p.mu.Unlock() p.bus.Publish(m.Topic(), m) } } @@ -101,10 +107,3 @@ func (p *MessagePipe) initPlugins() { r.Init(p) } } - -func (p *MessagePipe) shutdown() { - for _, r := range p.plugins { - r.Close() - } - close(p.messageChannel) -} From ca686bdf0f1a9c6d552368b34846113d962158b2 Mon Sep 17 00:00:00 2001 From: "o.omahony" Date: Thu, 20 Oct 2022 11:31:25 +0100 Subject: [PATCH 2/2] removed the defer --- src/core/pipe.go | 10 +++++----- .../vendor/github.com/nginx/agent/v2/src/core/pipe.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/core/pipe.go b/src/core/pipe.go index 986e67cac..39cf5f85e 100644 --- a/src/core/pipe.go +++ b/src/core/pipe.go @@ -41,11 +41,10 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { } func (p *MessagePipe) Register(size int, plugins ...Plugin) error { + p.mu.Lock() + p.plugins = append(p.plugins, plugins...) - p.bus = messagebus.New(size) - - p.mu.Lock() - defer p.mu.Unlock() + p.bus = messagebus.New(size) for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { @@ -56,6 +55,7 @@ func (p *MessagePipe) Register(size int, plugins ...Plugin) error { } } + p.mu.Unlock() return nil } @@ -84,8 +84,8 @@ func (p *MessagePipe) Run() { return case m := <-p.messageChannel: p.mu.Lock() - defer p.mu.Unlock() p.bus.Publish(m.Topic(), m) + p.mu.Unlock() } } } diff --git a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go index 986e67cac..39cf5f85e 100644 --- a/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go +++ b/test/performance/vendor/github.com/nginx/agent/v2/src/core/pipe.go @@ -41,11 +41,10 @@ func NewMessagePipe(ctx context.Context) *MessagePipe { } func (p *MessagePipe) Register(size int, plugins ...Plugin) error { + p.mu.Lock() + p.plugins = append(p.plugins, plugins...) - p.bus = messagebus.New(size) - - p.mu.Lock() - defer p.mu.Unlock() + p.bus = messagebus.New(size) for _, plugin := range p.plugins { for _, subscription := range plugin.Subscriptions() { @@ -56,6 +55,7 @@ func (p *MessagePipe) Register(size int, plugins ...Plugin) error { } } + p.mu.Unlock() return nil } @@ -84,8 +84,8 @@ func (p *MessagePipe) Run() { return case m := <-p.messageChannel: p.mu.Lock() - defer p.mu.Unlock() p.bus.Publish(m.Topic(), m) + p.mu.Unlock() } } }