Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kds): Avoid calling Send() from different goroutines #2573

Merged
merged 4 commits into from
Sep 7, 2021

Conversation

lahabana
Copy link
Contributor

Rewrote most of KDS mux as it wasn't working as intended.
KDS mux now ensures:

  • Recv() on underlying streams are always called in the same goroutine
  • Send() on underlying streams are always called in the same goroutine
  • Close() actually waits for some goroutine to terminate

Also added extra tests to ensure these invariants

Fixes #2492

@bchatelard
Copy link
Contributor

Thanks! I just tested this fix (cherry-pick in 1.2.3) and it looks like it is solving my issue #2492

@@ -75,20 +80,23 @@ func (c *client) Start(stop <-chan struct{}) (errs error) {
"client-id", c.clientID,
KDSVersionHeaderKey, KDSVersionV3,
)
log := muxClientLog.WithValues("client-id", c.clientID)
log.Info("initializing Kuma Discovery Service (KDS) stream for remote-zone sync of resources")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: global-zone

@lahabana
Copy link
Contributor Author

@jakubdyszkiewicz noticed an issue with restarts of global which would not have remote reconnect. I need to look into this.

pkg/kds/mux/session.go Outdated Show resolved Hide resolved
pkg/kds/mux/session.go Outdated Show resolved Hide resolved
pkg/kds/mux/session.go Outdated Show resolved Hide resolved
pkg/kds/mux/session.go Show resolved Hide resolved
pkg/kds/mux/session.go Outdated Show resolved Hide resolved
pkg/kds/mux/session.go Show resolved Hide resolved
}

func (s *session) close() {
func (s *session) Close() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these public functions should have godoc comments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close isn't idempotent, and I think that it's unsafe for anything except the mux server to call it. Can we make it private and consider renaming it?

s.clientStream.bufferStream.sendResult <- err
case <-s.closing:
select {
case <-s.serverStream.bufferStream.sendBuffer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code is trying to drain the send buffer, but I also don't think that you can actually fix the race like this.

At this point, the buffer stream cancellation has been called. If the other goroutine in Send hasn't checked the context yet, then you are OK, it will error. If it has already checked, then you don't need to poll any channels, just closing sendResult is enough to make it fail.

You still need to deal with closing the sendBuffer channel, which is never closed AFAICT. I think the easiest way to do this is to stuff a lock in the bufferStream and add API so close the buffer. Then the close is atomic WRT the sender.

Suggest wrapping bufferStream API around this to make it clearer. Something like this:

type bufferStream struct {    
        sendBuffer chan *mesh_proto.Message    
        sendResult chan error    
        recvBuffer chan *mesh_proto.Message    
                                                
        lock   sync.Mutex // Protects the write side of the buffer, not the read side.    
        closed bool    
}    
     
func (k *bufferStream) close() {    
        k.lock.Lock()    
        defer k.lock.Unlock()    
            
        k.closed = true    
        close(k.sendBuffer)
        close(k.sendResult)        
        close(k.recvBuffer)    
}    
     
func (k *bufferStream) dequeueMessage() *mesh_proto.Message {    
        return <-k.sendBuffer    
}                                    
     
func (k *bufferStream) enqueResult(err error) {    
        k.lock.Lock()    
        defer k.lock.Unlock()    
            
        if !k.closed {    
                k.sendResult <- err    
        }    
}    
     
func (k *bufferStream) Send(message *mesh_proto.Message) error {    
        k.lock.Lock()    
                             
        if k.closed {    
                return io.EOF    
        }    
             
        k.sendBuffer <- message    
            
        k.lock.Unlock()    
            
        return <-k.sendResult    
}                                   
     
func (k *bufferStream) Recv() (*mesh_proto.Message, error) {    
        k.lock.Lock()    
            
        if k.closed {    
                k.lock.Unlock()    
                return nil, io.EOF    
        }                                 
             
        k.lock.Unlock()    
            
        select {    
        case r := <-k.recvBuffer:    
                return r, nil        
        }             
}                     

Then you don't close the bufferStreams from the goroutines, you close them from the session Close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx it makes sense indeed. I was trying to stay away for Mutexes and trying to find the right way to write this.
I think this code achieves what I was trying to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With a mutex approach, there's a risk of deadlocking if you need the goroutine and the caller to both hold mutexes. The trick I've used here is that the mutex only guards the write side of the stream object, protecting against closed channels.

pkg/kds/mux/session.go Outdated Show resolved Hide resolved
pkg/kds/mux/session.go Outdated Show resolved Hide resolved
@lahabana
Copy link
Contributor Author

I haven't forgotten about this PR just want to add some more e2e tests to kds before getting back on this.

Rewrote most of KDS mux as it wasn't working as intended.
KDS mux now ensures:

Recv() on underlying streams are always called in the same goroutine
Send() on underlying streams are always called in the same goroutine
Close() actually waits for some goroutine to terminate
Also added extra tests to ensure these invariants

Fixes kumahq#2492

Signed-off-by: Charly Molter <charly.molter@konghq.com>
@lahabana
Copy link
Contributor Author

lahabana commented Sep 6, 2021

@jpeach @bchatelard @jakubdyszkiewicz I've finally got around to updating this. I still want to check a couple of things but I think the code is ready to be reviewed.

@codecov-commenter
Copy link

codecov-commenter commented Sep 6, 2021

Codecov Report

Merging #2573 (693a7cd) into master (0bacb2a) will increase coverage by 0.10%.
The diff coverage is 63.63%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2573      +/-   ##
==========================================
+ Coverage   51.67%   51.78%   +0.10%     
==========================================
  Files         870      870              
  Lines       50749    50774      +25     
==========================================
+ Hits        26226    26291      +65     
+ Misses      22428    22381      -47     
- Partials     2095     2102       +7     
Impacted Files Coverage Δ
pkg/kds/client/stream.go 69.33% <ø> (+1.80%) ⬆️
pkg/kds/context/context.go 50.00% <ø> (ø)
pkg/kds/global/components.go 12.28% <0.00%> (-0.69%) ⬇️
pkg/kds/mux/client.go 0.00% <0.00%> (ø)
pkg/kds/mux/server.go 0.00% <0.00%> (ø)
pkg/kds/mux/version.go 12.96% <ø> (ø)
pkg/kds/zone/components.go 16.92% <0.00%> (ø)
pkg/kds/client/sink.go 59.45% <50.00%> (+7.18%) ⬆️
pkg/kds/mux/session.go 84.61% <85.29%> (+30.22%) ⬆️
pkg/kds/mux/clientstream.go 40.00% <85.71%> (-8.28%) ⬇️
... and 14 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 0bacb2a...693a7cd. Read the comment docs.

Copy link
Contributor

@jpeach jpeach left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a few nits and small issues. I also think it's worth adding explanatory comments while this is all fresh in your mind :)

pkg/kds/client/sink.go Outdated Show resolved Hide resolved
pkg/kds/client/sink.go Outdated Show resolved Hide resolved
pkg/kds/global/components.go Outdated Show resolved Hide resolved
pkg/kds/global/components.go Outdated Show resolved Hide resolved
"client-id", c.clientID,
KDSVersionHeaderKey, KDSVersionV3,
)
))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: insert paragraph break here :)

start(newPolicySink(zoneName, zoneSyncer, clientStream, &testRuntimeContext{kds: kdsCtx}), stop)
go func() {
_ = newPolicySink(zoneName, zoneSyncer, clientStream, &testRuntimeContext{kds: kdsCtx}).Start()
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a data race detected here:

WARNING: DATA RACE
Write at 0x00c00098ef70 by goroutine 62:
  github.com/kumahq/kuma/pkg/kds/zone_test.glob..func1.3()
      /home/jpeach/upstream/konghq/kuma/pkg/kds/zone/components_test.go:89 +0x679
...

Previous read at 0x00c00098ef70 by goroutine 140:
  github.com/kumahq/kuma/pkg/kds/zone_test.glob..func1.3.1()
      /home/jpeach/upstream/konghq/kuma/pkg/kds/zone/components_test.go:92 +0xbb

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's part of the things I still need to check but I think it's a test only issue.

@@ -94,7 +88,9 @@ var _ = Describe("Zone Sync", func() {
zoneStore = memory.NewStore()
zoneSyncer = sync_store.NewResourceSyncer(core.Log.WithName("kds-syncer"), zoneStore)

start(newPolicySink(zoneName, zoneSyncer, clientStream, &testRuntimeContext{kds: kdsCtx}), stop)
go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should someone be waiting for the wait group above?

close(s.clientStream.responses)
func (k *bufferStream) put(message *mesh_proto.Message) {
k.recvBuffer <- message
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this method is only called from handleRecv and that is the only place that calls close(), writing to the streamn without holding the lock is OK.

I experimented with locking this, but it makes tests timeout since it serialized WRT Send(). Probably a rwlock is the right choice (hold the write when setting the closed flag).

return
}
r := itm.msg
if kdsVersion == KDSVersionV2 && r != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If r can actually be nil, then it still gets passed to stream.Send, which doesn't seem right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually forgot to remove the r!=nil which is not possible anymore

pkg/kds/mux/session.go Show resolved Hide resolved
Signed-off-by: Charly Molter <charly.molter@konghq.com>
Signed-off-by: Charly Molter <charly.molter@konghq.com>
@lahabana lahabana merged commit fa4f1bd into kumahq:master Sep 7, 2021
mergify bot pushed a commit that referenced this pull request Sep 7, 2021
Rewrote most of KDS mux as it wasn't working as intended.
KDS mux now ensures:

Recv() on underlying streams are always called in the same goroutine
Send() on underlying streams are always called in the same goroutine
Close() actually waits for some goroutine to terminate
Also added extra tests to ensure these invariants

Fixes #2492

Signed-off-by: Charly Molter <charly.molter@konghq.com>
(cherry picked from commit fa4f1bd)
lahabana added a commit that referenced this pull request Sep 8, 2021
…2744)

Rewrote most of KDS mux as it wasn't working as intended.
KDS mux now ensures:

Recv() on underlying streams are always called in the same goroutine
Send() on underlying streams are always called in the same goroutine
Close() actually waits for some goroutine to terminate
Also added extra tests to ensure these invariants

Fixes #2492

Signed-off-by: Charly Molter <charly.molter@konghq.com>
(cherry picked from commit fa4f1bd)

Co-authored-by: Charly Molter <charly.molter@konghq.com>
nikita15p pushed a commit to nikita15p/kuma that referenced this pull request Sep 28, 2021
Rewrote most of KDS mux as it wasn't working as intended.
KDS mux now ensures:

Recv() on underlying streams are always called in the same goroutine
Send() on underlying streams are always called in the same goroutine
Close() actually waits for some goroutine to terminate
Also added extra tests to ensure these invariants

Fixes kumahq#2492

Signed-off-by: Charly Molter <charly.molter@konghq.com>
nikita15p pushed a commit to nikita15p/kuma that referenced this pull request Sep 28, 2021
Rewrote most of KDS mux as it wasn't working as intended.
KDS mux now ensures:

Recv() on underlying streams are always called in the same goroutine
Send() on underlying streams are always called in the same goroutine
Close() actually waits for some goroutine to terminate
Also added extra tests to ensure these invariants

Fixes kumahq#2492

Signed-off-by: Charly Molter <charly.molter@konghq.com>
@lahabana lahabana deleted the fix/kdsMuxGetsStuck branch March 29, 2024 12:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Multi region kds sync stops with a lot of dataplanes
5 participants