Replacement muxManager for existing transport layer#147
Conversation
transport/mux/establisher.go
Outdated
| // If the server rapidly disconnects us, we don't want to get caught in a tight loop. Sleep 1-2 seconds before retry | ||
| time.Sleep(time.Second + time.Duration(rand.IntN(1000))*time.Millisecond) | ||
| } | ||
| return &MuxProvider{name, connPv, sessionFn, disconnectFn, transportFn, metricLabels, logger, shutDown, sync.Once{}}, nil |
There was a problem hiding this comment.
style nit: Use field names to initialize, e.g.
return &MuxProvider{
name: name,
connProvider: connPv,
...
}
transport/mux_manager.go
Outdated
| m.connAvailable.L.Unlock() | ||
| return errors.New("the mux manager is shutting down") | ||
| } | ||
| m.connAvailable.Wait() |
There was a problem hiding this comment.
- When does
Wait()wake up? (I don't see that we callm.connAvailable.Signal()or.Broadcast()) - What if we shutdown during
Wait(). Does it get unblocked somehow?
There was a problem hiding this comment.
Oops, I forgot the most important part: ReplaceConnection calls .Broadcast(). I'll add another .Broadcast() on context cancel.
transport/mux/establisher.go
Outdated
| } | ||
|
|
||
| // GetConnection makes a TCP call to establish a connection, then returns it. Retries with backoff over 30 seconds | ||
| func (p *establishingConnProvider) GetConnection() (net.Conn, error) { |
There was a problem hiding this comment.
nit naming: Maybe GetNewConnection, to clarify it makes a new connection versus returning a previously created connection.
There was a problem hiding this comment.
Makes sense, will update
transport/mux_manager.go
Outdated
There was a problem hiding this comment.
Should this go into the transport/mux/ package?
There was a problem hiding this comment.
Good question. I initially put it in transport because I considered it a replacement for mux_connection_manager+mux_transport, but honestly it's probably cleaner to move it to transport/mux
transport/mux/provider.go
Outdated
|
|
||
| m.setNewTransport(session, conn) | ||
| metrics.MuxConnectionEstablish.WithLabelValues(m.metricLabels...).Inc() | ||
| <-session.CloseChan() |
There was a problem hiding this comment.
Should we check shutdown chan here? Or are we confident session will be closed when shutdown?
There was a problem hiding this comment.
Yeah, I think it's helpful
transport/mux_manager.go
Outdated
|
|
||
| // WithConnection waits on connAvailable's condition until the pointer is non-null, then runs the provided function | ||
| // with that pointer. | ||
| func (m *muxManager) WithConnection(f func(*SessionWithConn) error) error { |
There was a problem hiding this comment.
I don't see WithConnection being called yet. How will this get used?
There was a problem hiding this comment.
That'll be my next PR. transport.go will use this together with GRPC to set up the server and clients
| // WithConnection waits on connAvailable's condition until the pointer is non-null, then runs the provided function | ||
| // with that pointer. | ||
| func (m *MuxManager) WithConnection(f func(*SessionWithConn) error) error { | ||
| m.connAvailable.L.Lock() |
There was a problem hiding this comment.
nit/minor: sync.Cond is less often used in Go, so this is a little silly, but commenting a bit of guidance or some reminders on using sync.Cond correctly would help when revisiting the code in the future (or maybe just a link to the docs is enough)
Also, anything specific to how we use it here - like when Wait will become unblocked, and etc.
There was a problem hiding this comment.
Oh yeah I agree with this. I'll add some details
| m.connAvailable.L.Unlock() | ||
| return errors.New("the mux manager is shutting down") | ||
| } | ||
| m.connAvailable.Wait() |
There was a problem hiding this comment.
I still have this question:
What if we shutdown during Wait(). Does it get unblocked somehow?
Do we need to signal/broadcast on shutdown?
| // TryConnectionOrElse grabs whatever connection is available and runs f on that connection. | ||
| // The received SessionWithConn is guaranteed to be nil, a valid yamux session, or a closed yamux session | ||
| func TryConnectionOrElse[T any](m *MuxManager, f func(*SessionWithConn) T, other T) T { | ||
| conn := m.muxConnection.Load() |
There was a problem hiding this comment.
not-blocking: I'm a little suspicious that we lock m.muxConnection in other spots but not here.
One thing that comes to mind is a RWMutex, and we could take a read lock - https://pkg.go.dev/sync#RWMutex. This might be awkward when the lock is sync.Cond.L... But we could guarantee that m.muxConnection is not changed during this function.
Maybe it's not a problem. Because we have an atomic value, so the read is safe. But might be good to disallow both TryConnectionOrElse and ReplaceConnection at the same time (not entirely clear how we'll use this yet though)
There was a problem hiding this comment.
Oh yeah, TryConnectionOrElse deliberately runs the lock. This is safe because of the atomic.Pointer, it just has no wait semantics. It's intended for things that just need to periodically check on the mux manager without waiting
| existingConn := m.muxConnection.Load() | ||
| if existingConn != nil { | ||
| _ = existingConn.session.Close() | ||
| _ = existingConn.conn.Close() | ||
| } | ||
| // Now add new conn | ||
| m.muxConnection.Store(swc) |
There was a problem hiding this comment.
Is it possible that something could grab the existingConn just after the Load()?
e.g.
- thread 1: conn := m.muxConnection.Load()
- thread 2: conn := m.muxConnection.Load()
- thread 1: conn.Close()
- thread 2: --> error !
And if it's possible, would it be a problem?
There was a problem hiding this comment.
This is ok. The session and connection may close at any point, in which case the component that has a reference to MuxManager is expected to call WithConnection again
* Replacement muxManager for existing transport layer * fix lint errors * rename packages/files * Add missing Broadcast, ensure blocked threads are always freed on context close * Add comments for MuxProvider, remove isDone in favor of Err() != nil * Simplify mux manager close hook * Fix nilpointer when replacing nil connection * Switch to channel.ShutdownOnce for component termination * Rename GetConnection -> NewConnection * Add comments * fmt
What was changed
muxManager added to replace the muxConnectionManager
API:
Compared to muxConnectionManager, muxManager exposes a much simpler API for use: WithConnection. WithConnection receives a function, blocks the current thread until a connected yamux session is available, and then runs the function with the session. The underlying yamux.Session and Connection values are protected using a condition variable. This guarantees any number of waiting threads will all see the connection become available at the same time, and also that no thread will see a nil session, so long as all updates are done using the corresponding ReplaceConnection API.
MuxProvider
Interior struct modeling a single source of mux connections. A MuxProvider can be started once, and will run until its context is cancelled. Once started, the MuxProvider will repeatedly run its
connProviderandsessionFn, call ReplaceConnection on their muxManager, and then block until the yamux session is closed. Mux providers observe a cancellable context to allow other components to stop them from running.establisher.go, receiver.go, and observer.go
The logic from clientLoop and serverLoop has been reorganized and split into establisher.go and receiver.go. observeYamuxSession has moved to observer.go to help keep each file focused on a single task
Why?
This reorganization is a fix to some incorrect synchronization that was happening in the current muxConnectionManager. The original code had multiple places where control flow escaped the component, making it impossible to ensure lock safety in some cases. This new component avoids that problem by limiting interaction with the struct internals.