Skip to content

Remove muxConnectionManager and replace with MuxManager#149

Merged
temporal-nick merged 25 commits intomainfrom
nick/wiremuxmanager
Sep 15, 2025
Merged

Remove muxConnectionManager and replace with MuxManager#149
temporal-nick merged 25 commits intomainfrom
nick/wiremuxmanager

Conversation

@temporal-nick
Copy link
Collaborator

@temporal-nick temporal-nick commented Sep 8, 2025

What was changed

  • Replaced muxConnectionManager with MuxManager in transport.go
  • Allowed passing in a context to WithConnection, threads blocked there will correctly observe configured timeouts
  • New goroutine will wake all threads waiting in WithConnection every 5 seconds to check on their timeouts. This represents no change in lock correctness relative to the existing Broadcast behavior. (e.g. there was already an unlocked section in between waiting threads and the checked condition, and the condition is still checked.)
  • Fine-tuned lifecycle management for more reliable unit tests
    • MuxManagers are now created with New -> Configured -> Started. The MuxManager's MuxProvider keeps track of whether it has been started and whether it has been fully cleaned up. Waiting on CleanupCh from the appropriate component allows goroutines to block until all resources have been closed.

Why?

Replacing the muxConnectionManager will remove its current locking bugs and replace it with this better-behaved pattern.

In adapting the existing unit tests, several bugs were found and fixed, and the observability has been improved.

  • Session.Ping() is now exposed from the MuxManager for easy re-use. Ping() on a dead session will immediately close the channel, removing the need to wait in unit tests.
  • All components now expose a cleanup channel, which removes the need to time.Sleep in unit tests

@temporal-nick temporal-nick requested a review from pglass September 8, 2025 19:39
@temporal-nick temporal-nick requested a review from a team as a code owner September 8, 2025 19:39
@temporal-nick temporal-nick changed the title [WIP] Remove muxConnectionManager and replace with MuxManager Remove muxConnectionManager and replace with MuxManager Sep 9, 2025
@temporal-nick temporal-nick requested review from a team and pglass September 9, 2025 22:48
// TryConnectionOrElse grabs whatever connection is available and runs f on that connection.
// The received SessionWithConn is guaranteed to be nil, a valid yamux session, or a closed yamux session
func TryConnectionOrElse[T any](m *MuxManager, f func(*SessionWithConn) T, other T) T {
func (m *muxManager) TryConnectionOrElse(f func(*SessionWithConn) any, orElse any) any {
Copy link
Contributor

@pglass pglass Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What do you think about calling this WithConnectionOrElse?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could go either way on this. Usually Try is the thing that tells you the method won't block, but I agree that OrElse does basically the same job in this case

// NewConnection waits on the TCP server for a connection, then provides it
func (r *receivingConnProvider) NewConnection() (net.Conn, error) {
conn, err := r.listener.Accept()
if r.shouldShutDown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this, since we have the goroutine waiting on shouldShutdown already?

	go func() {
		<-shouldShutDown.Channel()
		err := listener.Close()
		if err != nil {
			logger.Fatal("listener.Close failed", tag.Error(err))
		}
		connPv.hasCleanedUp.Shutdown()
	}()

I guess we could Accept() and then another goroutine could toggle shouldShouldDown to at L80. But if that's a concern, it could be toggled true at L89, also.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The listener would be cancelled regardless. This just customizes the error message

}
func (tm *TransportManager) IsMuxActive(name string) bool {
return tm.muxConnManagers[name].status.Load() == int32(statusStarted)
//return tm.muxConnManagers[name].Load() == int32(statusStarted)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove comment

Co-authored-by: Paul Glass <pnglass@gmail.com>
@temporal-nick temporal-nick merged commit a4ecf67 into main Sep 15, 2025
6 checks passed
@temporal-nick temporal-nick deleted the nick/wiremuxmanager branch September 15, 2025 23:21
hai719 pushed a commit to hai719/s2s-proxy that referenced this pull request Nov 24, 2025
* Remove muxConnectionManager and replace with MuxManager

* Fix unit tests, improve muxManager lifecycle

* Fix shutdownch fake in mux_manager_test

* Add missing channels to test

* Clean up signaling logic, remove generics

* Remove redundant IsShutdown check

* Add metrics and logging for muxmanager

* Ensure metricLabels is set, metrics are configured

* Fix mux_manager_test

* Refine connection metrics

* have mux observer ping the session to get latency stats

* Add more metrics

* More metrics, wrap yamux logger

* Move waiting for client metric close to include error case

* Change to 30s stream timeout

* Enable server log debugging

* more logs

* more logs

* Include details about the replacement connection

* Ensure grpc server restarts on error

* fmt

* Update metrics/prometheus_defs.go

Co-authored-by: Paul Glass <pnglass@gmail.com>

* Requested PR fixes

* fmt

---------

Co-authored-by: Paul Glass <pnglass@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants