diff --git a/xgress/xgress.go b/xgress/xgress.go index 4d673729..a58f2317 100644 --- a/xgress/xgress.go +++ b/xgress/xgress.go @@ -61,7 +61,7 @@ type AckSender interface { type OptionsData map[interface{}]interface{} -// The BindHandlers are invoked to install the appropriate handlers. +// BindHandler is an interface invoked to install the appropriate handlers. type BindHandler interface { HandleXgressBind(x *Xgress) } @@ -923,7 +923,7 @@ func (self *Xgress) SendEmptyAck() { func (self *Xgress) GetSequence() uint64 { self.rxSequenceLock.Lock() defer self.rxSequenceLock.Unlock() - return uint64(self.rxSequence) + return self.rxSequence } func (self *Xgress) getLastBufferSizeSent() uint32 { diff --git a/ziti/edge/conn.go b/ziti/edge/conn.go index b9864de7..e0b80d75 100644 --- a/ziti/edge/conn.go +++ b/ziti/edge/conn.go @@ -18,10 +18,6 @@ package edge import ( "fmt" - "github.com/openziti/edge-api/rest_model" - "github.com/openziti/foundation/v2/concurrenz" - "github.com/openziti/sdk-golang/xgress" - "github.com/openziti/secretstream/kx" "io" "net" "os" @@ -31,7 +27,11 @@ import ( "github.com/google/uuid" "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v4" + "github.com/openziti/edge-api/rest_model" + "github.com/openziti/foundation/v2/concurrenz" "github.com/openziti/foundation/v2/sequence" + "github.com/openziti/sdk-golang/xgress" + "github.com/openziti/secretstream/kx" ) const ( diff --git a/ziti/edge/msg_mux.go b/ziti/edge/msg_mux.go index 38f26013..ab395547 100644 --- a/ziti/edge/msg_mux.go +++ b/ziti/edge/msg_mux.go @@ -18,6 +18,11 @@ package edge import ( "fmt" + "math" + "strings" + "sync/atomic" + "time" + "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v4" "github.com/openziti/sdk-golang/inspect" @@ -25,47 +30,309 @@ import ( cmap "github.com/orcaman/concurrent-map/v2" "github.com/pkg/errors" "github.com/sirupsen/logrus" - "math" - "strings" - "sync/atomic" - "time" ) -type MsgSink interface { - HandleMuxClose() error +// MsgSink represents a message handler that can receive and process messages +// for a specific connection in a multiplexed channel. Each MsgSink is associated +// with a unique connection ID and handles the message flow for that logical connection. +// +// MsgSink implementations are responsible for: +// - Processing incoming messages from the multiplexer +// - Providing a unique connection ID for routing +// - Handling connection lifecycle events (setup, teardown) +// - Managing connection-specific state and sequencing +// - Storing and retrieving arbitrary context data +// +// Thread Safety: Implementations should be safe for concurrent use, as messages +// may be delivered from multiple goroutines. +type MsgSink[T any] interface { + // Id returns the unique connection ID that this sink handles. + // This ID is used by the ConnMux to route messages to the correct sink. + // The ID must remain constant for the lifetime of the sink. + // + // Returns: + // uint32 - the connection ID for this message sink + // + // Example: + // connId := sink.Id() + // // Use connId for routing decisions Id() uint32 + + // Accept processes an incoming message for this connection. + // The message is guaranteed to be intended for this sink's connection ID. + // The sink should handle the message appropriately based on its type and content. + // + // Parameters: + // msg - the incoming message to process + // + // Example: + // func (s *myMsgSink) Accept(msg *channel.Message) { + // switch msg.ContentType { + // case edge.ContentTypeData: + // s.handleData(msg) + // case edge.ContentTypeStateClosed: + // s.handleClose(msg) + // } + // } Accept(msg *channel.Message) + + // HandleMuxClose is called when the underlying multiplexer is closing. + // This gives the sink an opportunity to perform cleanup operations + // and notify any dependent components that the connection is being terminated. + // + // The sink should: + // - Release any held resources + // - Notify dependent components of the closure + // - Complete any pending operations gracefully + // + // Returns: + // error - any error encountered during cleanup, nil if successful + // + // Example: + // func (s *myMsgSink) HandleMuxClose() error { + // s.cleanup() + // return s.conn.Close() + // } + HandleMuxClose() error + + // GetData retrieves arbitrary context data associated with this connection. + // This allows implementing code to store and retrieve connection-specific + // context, state, or metadata that may be needed across different operations. + // + // The data is connection-scoped and persists for the lifetime of the MsgSink. + // Returns nil if no data has been set. + // + // Returns: + // T - the stored context data, or nil if none exists + // + // Example: + // userData := sink.GetData() + // if edgeCtx, ok := userData.(*EdgeConnContext); ok { + // // Use context + // } + GetData() T + + // SetData stores arbitrary context data associated with this connection. + // This allows implementing code to attach connection-specific context, + // state, or metadata that can be retrieved later during the connection's lifetime. + // + // The data should be treated as connection-scoped and will be available + // until the connection is closed or the data is overwritten. + // + // Parameters: + // data - arbitrary context data to associate with this connection + // + // Example: + // authCtx := &AuthContext{UserID: "user123", Permissions: perms} + // sink.SetData(authCtx) + // + // // Later retrieve it + // if ctx := sink.GetData(); ctx != nil { + // edgeCtx := ctx.(*EdgeConnContext) + // // Use context + // } + SetData(data T) } -type MsgMux interface { - channel.TypedReceiveHandler - channel.CloseHandler - AddMsgSink(sink MsgSink) error - RemoveMsgSink(sink MsgSink) - RemoveMsgSinkById(sinkId uint32) +// ConnMux (Connection Multiplexer) manages multiple logical connections +// over a single channel. It provides message routing and connection lifecycle +// management based on connection IDs (connId). +// +// The multiplexer enables efficient use of transport resources by allowing many +// application-level connections to share a single underlying channel connection. +// Each logical connection is identified by a unique uint32 connection ID. +// +// Thread Safety: Implementations must be safe for concurrent use by multiple goroutines. +type ConnMux[T any] interface { + // Add registers a message handler for a specific connection. + // The sink's ID() method determines which connection ID it handles. + // Returns an error if the connection ID is already registered or if + // registration fails for any other reason. + // + // Example: + // conn := &edgeXgressConn{connId: 12345, ...} + // err := mux.Add(conn) + Add(sink MsgSink[T]) error + + // Remove unregisters the specified message handler. + // This removes the connection from the multiplexer's routing table. + // The method is idempotent - removing a non-existent sink is not an error. + // + // Example: + // mux.Remove(conn) + Remove(sink MsgSink[T]) + + // RemoveByConnId removes a connection by its ID from the multiplexer. + // This is a convenience method that removes the connection without + // requiring a reference to the original MsgSink. + // The method is idempotent - removing a non-existent connection ID is not an error. + // + // Parameters: + // connId - the connection ID to remove + // + // Example: + // mux.RemoveByConnId(12345) + RemoveByConnId(connId uint32) + + // Close shuts down the multiplexer and all managed connections. + // After calling Close, the multiplexer should not accept new connections + // or route any more messages. All registered message sinks will be notified + // of the closure. + // + // This method should be called when the underlying channel is being closed + // to ensure proper cleanup of all multiplexed connections. Close() + + // GetActiveConnIds returns a slice of all active connection IDs. + // This method provides visibility into which connections are currently + // being managed by the multiplexer. + // + // Returns: + // []uint32 - slice of active connection IDs, may be empty if no connections exist + // + // Example: + // connIds := mux.GetActiveConnIds() + // fmt.Printf("Active connections: %v\n", connIds) + GetActiveConnIds() []uint32 + + // HasConn checks if a specific connection ID is currently active. + // This is useful for validation before attempting operations on a connection. + // + // Parameters: + // connId - the connection ID to check + // + // Returns: + // bool - true if the connection exists, false otherwise + // + // Example: + // if mux.HasConn(12345) { + // // connection exists, safe to send messages + // } + HasConn(connId uint32) bool + + // GetConnCount returns the number of active connections. + // This provides a quick way to check multiplexer load without + // allocating a slice of connection IDs. + // + // Returns: + // int - number of active connections + // + // Example: + // count := mux.GetConnCount() + // if count > maxConnections { + // // handle overload condition + // } + GetConnCount() int + + // GetSinks returns a snapshot of all currently active message sinks managed by this multiplexer. + // This method provides access to the complete set of active connections and their associated + // message handlers, indexed by their connection IDs. + // + // The returned map is a snapshot taken at the time of the call and will not reflect + // subsequent additions or removals of connections. This method is useful for: + // - Administrative operations that need to inspect all active connections + // - Debugging and monitoring tools + // - Broadcasting operations across all connections + // - Connection lifecycle management and cleanup + // + // Returns: + // map[uint32]MsgSink[T] - a map of connection IDs to their corresponding message sinks + // + // Thread Safety: This method is safe for concurrent use. + // + // Example: + // sinks := mux.GetSinks() + // for connId, sink := range sinks { + // fmt.Printf("Connection %d: %v\n", connId, sink.GetData()) + // } + GetSinks() map[uint32]MsgSink[T] + + // GetNextId generates the next available connection ID for creating new connections. + // This method ensures that connection IDs are unique within the multiplexer's scope + // and handles ID allocation automatically. The implementation may use sequential + // numbering, random generation, or other strategies to avoid collisions. + // + // The returned ID is guaranteed to be unique among currently active connections + // and can be safely used for creating new MsgSink instances. + // + // Returns: + // uint32 - a unique connection ID that can be used for a new connection + // + // Example: + // connId := mux.GetNextId() + // conn := &myConnection{id: connId} + // mux.Add(conn) GetNextId() uint32 + + // TypedReceiveHandler is an embedded interface + // + // TypedReceiveHandler provides typed message handling capabilities for the multiplexer. + // This allows the ConnMux to be registered as a message handler with the underlying + // channel infrastructure, enabling automatic message routing based on message types. + // + // The embedded interface typically includes methods like: + // - ContentType() int32 - returns the message content type this handler processes + // - HandleReceive(msg *channel.Message, ch channel.Channel) - processes typed messages + // + // This integration allows the ConnMux to participate in the channel's message + // dispatch system while maintaining its connection multiplexing functionality. + channel.TypedReceiveHandler + + // CloseHandler is an embedded interface + // + // CloseHandler provides automatic cleanup capabilities when the underlying channel closes. + // This ensures that all multiplexed connections are properly notified and cleaned up + // when the physical channel connection is terminated. + // + // The embedded interface typically includes: + // - HandleClose(ch channel.Channel) - called when the channel is closing + // + // When the channel closes, the ConnMux should: + // - Notify all registered MsgSinks via their HandleMuxClose() method + // - Clean up internal routing tables and connection state + // - Release any held resources + // + // This automatic integration ensures graceful shutdown of all multiplexed + // connections without requiring manual cleanup by the application. + channel.CloseHandler } -func NewMapMsgMux() MsgMux { - result := &MsgMuxImpl{ +func NewChannelConnMapMux[T any]() ConnMux[T] { + result := &ConnMuxImpl[T]{ maxId: (math.MaxUint32 / 2) - 1, - sinks: cmap.NewWithCustomShardingFunction[uint32, MsgSink](func(key uint32) uint32 { + sinks: cmap.NewWithCustomShardingFunction[uint32, MsgSink[T]](func(key uint32) uint32 { return key }), } return result } -type MsgMuxImpl struct { +type ConnMuxImpl[T any] struct { closed atomic.Bool - sinks cmap.ConcurrentMap[uint32, MsgSink] + sinks cmap.ConcurrentMap[uint32, MsgSink[T]] nextId uint32 minId uint32 maxId uint32 } -func (mux *MsgMuxImpl) GetNextId() uint32 { +func (mux *ConnMuxImpl[T]) GetActiveConnIds() []uint32 { + return mux.sinks.Keys() +} + +func (mux *ConnMuxImpl[T]) HasConn(connId uint32) bool { + _, found := mux.sinks.Get(connId) + return found +} + +func (mux *ConnMuxImpl[T]) GetConnCount() int { + return mux.sinks.Count() +} + +func (mux *ConnMuxImpl[T]) GetSinks() map[uint32]MsgSink[T] { + return mux.sinks.Items() +} + +func (mux *ConnMuxImpl[T]) GetNextId() uint32 { nextId := atomic.AddUint32(&mux.nextId, 1) for { if _, found := mux.sinks.Get(nextId); found { @@ -82,11 +349,11 @@ func (mux *MsgMuxImpl) GetNextId() uint32 { } } -func (mux *MsgMuxImpl) ContentType() int32 { +func (mux *ConnMuxImpl[T]) ContentType() int32 { return ContentTypeData } -func (mux *MsgMuxImpl) HandleReceive(msg *channel.Message, ch channel.Channel) { +func (mux *ConnMuxImpl[T]) HandleReceive(msg *channel.Message, ch channel.Channel) { connId, found := msg.GetUint32Header(ConnIdHeader) if !found { if msg.ContentType == ContentTypeInspectRequest { @@ -116,7 +383,7 @@ func (mux *MsgMuxImpl) HandleReceive(msg *channel.Message, ch channel.Channel) { } } -func (mux *MsgMuxImpl) handlePayloadWithNoSink(msg *channel.Message, ch channel.Channel) { +func (mux *ConnMuxImpl[T]) handlePayloadWithNoSink(msg *channel.Message, ch channel.Channel) { connId, _ := msg.GetUint32Header(ConnIdHeader) payload, err := xgress.UnmarshallPayload(msg) if err == nil { @@ -135,7 +402,7 @@ func (mux *MsgMuxImpl) handlePayloadWithNoSink(msg *channel.Message, ch channel. } } -func (mux *MsgMuxImpl) HandleInspect(msg *channel.Message, ch channel.Channel) { +func (mux *ConnMuxImpl[T]) HandleInspect(msg *channel.Message, ch channel.Channel) { resp := &inspect.SdkInspectResponse{ Success: true, Values: make(map[string]any), @@ -172,7 +439,7 @@ func (mux *MsgMuxImpl) HandleInspect(msg *channel.Message, ch channel.Channel) { mux.returnInspectResponse(msg, ch, resp) } -func (mux *MsgMuxImpl) returnInspectResponse(msg *channel.Message, ch channel.Channel, resp *inspect.SdkInspectResponse) { +func (mux *ConnMuxImpl[T]) returnInspectResponse(msg *channel.Message, ch channel.Channel, resp *inspect.SdkInspectResponse) { var sender channel.Sender = ch if mc, ok := ch.(channel.MultiChannel); ok { if sdkChan, ok := mc.GetUnderlayHandler().(SdkChannel); ok { @@ -192,11 +459,11 @@ func (mux *MsgMuxImpl) returnInspectResponse(msg *channel.Message, ch channel.Ch } } -func (mux *MsgMuxImpl) HandleClose(channel.Channel) { +func (mux *ConnMuxImpl[T]) HandleClose(channel.Channel) { mux.Close() } -func (mux *MsgMuxImpl) AddMsgSink(sink MsgSink) error { +func (mux *ConnMuxImpl[T]) Add(sink MsgSink[T]) error { if mux.closed.Load() { return errors.Errorf("mux is closed, can't add sink with id [%v]", sink.Id()) } @@ -207,15 +474,15 @@ func (mux *MsgMuxImpl) AddMsgSink(sink MsgSink) error { return nil } -func (mux *MsgMuxImpl) RemoveMsgSink(sink MsgSink) { - mux.RemoveMsgSinkById(sink.Id()) +func (mux *ConnMuxImpl[T]) Remove(sink MsgSink[T]) { + mux.RemoveByConnId(sink.Id()) } -func (mux *MsgMuxImpl) RemoveMsgSinkById(sinkId uint32) { - mux.sinks.Remove(sinkId) +func (mux *ConnMuxImpl[T]) RemoveByConnId(connId uint32) { + mux.sinks.Remove(connId) } -func (mux *MsgMuxImpl) Close() { +func (mux *ConnMuxImpl[T]) Close() { if mux.closed.CompareAndSwap(false, true) { // we don't need to lock the mux because due to the atomic bool, only one go-routine will enter this. // If the sink HandleMuxClose methods do anything with the mux, like remove themselves, they will acquire @@ -224,7 +491,7 @@ func (mux *MsgMuxImpl) Close() { for _, val := range sinks { if err := val.HandleMuxClose(); err != nil { pfxlog.Logger(). - WithField("sinkId", val.Id()). + WithField("connId", val.Id()). WithError(err). Error("error while closing message sink") } diff --git a/ziti/edge/network/conn.go b/ziti/edge/network/conn.go index 8499ea48..a7fdfa3c 100644 --- a/ziti/edge/network/conn.go +++ b/ziti/edge/network/conn.go @@ -17,10 +17,11 @@ package network import ( + "crypto/rand" + "encoding/base64" + "encoding/binary" "encoding/json" "fmt" - "github.com/openziti/sdk-golang/inspect" - "github.com/openziti/sdk-golang/xgress" "io" "net" "strconv" @@ -29,13 +30,12 @@ import ( "sync/atomic" "time" - "crypto/rand" - "encoding/base64" - "encoding/binary" "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v4" "github.com/openziti/edge-api/rest_model" "github.com/openziti/foundation/v2/info" + "github.com/openziti/sdk-golang/inspect" + "github.com/openziti/sdk-golang/xgress" "github.com/openziti/sdk-golang/ziti/edge" "github.com/openziti/secretstream" "github.com/openziti/secretstream/kx" @@ -47,33 +47,168 @@ var unsupportedCrypto = errors.New("unsupported crypto") var _ edge.Conn = &edgeConn{} +// edgeConn represents an individual connection in the Ziti edge network. +// It implements both edge.Conn (providing standard network connection semantics) and edge.MsgSink +// (handling connection-specific messages from the edge router). +// +// Use Cases: +// - Service Hosting: Created by edgeHostConn when a client dials a hosted service +// - Service Consumption: Created when this SDK dials a service hosted elsewhere +// +// Architecture: +// - Registered with a msgMux to receive connection-specific messages +// - Handles data transfer, flow control, and encryption for a single session +// - Provides standard Read/Write interface while managing underlying message protocols +// - Manages connection lifecycle from establishment to termination +// +// Message Flow: +// 1. Remote peer sends data → Edge router → msgMux routes to this edgeConn.Accept() +// 2. edgeConn.Accept() processes message based on content type (data, state, ack, etc.) +// 3. Application reads data via Read() method from internal buffer +// 4. Application writes data via Write() method, which sends to edge router +// +// Lifecycle: +// 1. Created during connection establishment (dial or accept) +// 2. Added to msgMux for message routing +// 3. Handles session until Close() or remote disconnect +// 4. Removed from msgMux and cleaned up +// +// Thread Safety: All methods are safe for concurrent use. type edgeConn struct { + // MsgChannel provides the underlying channel communication capabilities + // for sending messages back to the edge router (data, state changes, acks, etc.) edge.MsgChannel - readQ *noopSeq[*channel.Message] - inBuffer [][]byte - msgMux edge.MsgMux - flags uint32 - closed atomic.Bool - closeNotify chan struct{} - readFIN atomic.Bool - sentFIN atomic.Bool - serviceName string - sourceIdentity string + + // readQ sequences incoming messages for ordered delivery to the application. + // Ensures data messages are delivered in the correct order even with + // concurrent message processing. + readQ *noopSeq[*channel.Message] + + // inBuffer stores message data that has been received but not yet read + // by the application. Acts as a buffer between network messages and Read() calls. + inBuffer [][]byte + + // msgMux is the parent connection multiplexer that manages this connection. + // Used to remove this connection when it closes. + msgMux edge.ConnMux[any] + + // flags stores various connection state flags for internal protocol handling. + flags uint32 + + // closed indicates whether this connection has been terminated. + // Used to prevent operations on closed connections. + closed atomic.Bool + + // closeNotify is used to signal connection closure to waiting goroutines. + // Closed when the connection is terminated. + closeNotify chan struct{} + + // readFIN indicates that the remote side has finished sending data. + // When true, no more data will be received from the client. + readFIN atomic.Bool + + // sentFIN indicates that this side has finished sending data. + // When true, no more data can be sent to the client. + sentFIN atomic.Bool + + // serviceName is the name of the service this connection is accessing. + // Used for logging and debugging purposes. + serviceName string + + // sourceIdentity identifies the remote peer that established this connection. + // For hosted services: identifies the client dialing the service + // For consumed services: identifies the service host + // Used for authorization, logging, and audit purposes. + sourceIdentity string + + // acceptCompleteHandler handles the completion of the connection handshake. + // Set during connection establishment and cleared after handshake completion. acceptCompleteHandler *newConnHandler - marker string - circuitId string - customState map[int32][]byte - crypto bool - keyPair *kx.KeyPair - rxKey []byte + // marker is a unique identifier for this connection instance. + // Used for tracing and debugging across the distributed system. + marker string + + // circuitId identifies the network circuit used for this connection. + // Used for routing and network-level debugging. + circuitId string + + // customState stores protocol-specific state information as key-value pairs. + // Used by the edge protocol for managing connection-level metadata. + customState map[int32][]byte + + // crypto indicates whether end-to-end encryption is enabled for this connection. + // When true, all data is encrypted/decrypted using the sender/receiver. + crypto bool + + // keyPair contains the cryptographic keys for this connection's encryption. + // Used during encryption setup and for deriving session keys. + keyPair *kx.KeyPair + + // rxKey is the derived key used for decrypting incoming data. + // Generated during the encryption handshake process. + rxKey []byte + + // receiver handles decryption of incoming encrypted data from the client. + // Only used when crypto is enabled. receiver secretstream.Decryptor - sender secretstream.Encryptor - appData []byte + + // sender handles encryption of outgoing data to the client. + // Only used when crypto is enabled. + sender secretstream.Encryptor + + // appData contains application-specific data sent during connection establishment. + // Available to the hosting application for connection-specific context. + appData []byte + + // Mutex protects concurrent access to encryption/decryption operations + // and other critical sections that modify connection state. sync.Mutex - dataSink io.Writer + // dataSink is an optional writer that receives a copy of all data + // sent through this connection. Used for logging or monitoring purposes. + dataSink io.Writer + + // xgCircuit manages the underlying transport circuit for this connection. + // Handles flow control, acknowledgments, and low-level data transfer. xgCircuit *XgAdapter + + // data stores arbitrary connection-specific context information that can be + // accessed by the application. This might include session data, authentication + // state, request context, or other per-connection metadata. + // For hosted services: stores client-specific context + // For consumed services: stores connection-specific application state + data atomic.Value +} + +// GetData retrieves arbitrary connection-specific context data associated with this connection. +// This allows applications to store and retrieve per-connection state, session information, +// or metadata that applies to this individual connection. +// +// Returns: +// - any: the stored connection context data, or nil if none has been set +// +// Examples of connection-specific data: +// - User authentication state and session tokens +// - Request context and correlation IDs +// - Connection-specific configuration or feature flags +// - Per-connection metrics or rate limiting state +// - Custom connection handlers or middleware +// - Application-specific connection state +func (conn *edgeConn) GetData() any { + return conn.data.Load() +} + +// SetData stores arbitrary connection-specific context data for this connection. +// This data persists for the lifetime of this connection and can be accessed +// during message processing or other connection operations. +// +// Parameters: +// - data: arbitrary context data to associate with this connection +// +// Thread Safety: This method is safe for concurrent use. +func (conn *edgeConn) SetData(data any) { + conn.data.Store(data) } func (conn *edgeConn) Write(data []byte) (int, error) { @@ -710,7 +845,7 @@ func (conn *edgeConn) close(closedByRemote bool) { } } - conn.msgMux.RemoveMsgSink(conn) // if we switch back to ChMsgMux will need to be done async again, otherwise we may deadlock + conn.msgMux.Remove(conn) // if we switch back to ChMsgMux will need to be done async again, otherwise we may deadlock } else { // cancel any pending writes _ = conn.xgCircuit.writeAdapter.SetWriteDeadline(time.Now()) diff --git a/ziti/edge/network/conn_test.go b/ziti/edge/network/conn_test.go index cbdad8b7..6396bec7 100644 --- a/ziti/edge/network/conn_test.go +++ b/ziti/edge/network/conn_test.go @@ -3,14 +3,15 @@ package network import ( "crypto/x509" "encoding/binary" - "github.com/openziti/channel/v4" - "github.com/openziti/foundation/v2/sequencer" - "github.com/openziti/sdk-golang/ziti/edge" - "github.com/stretchr/testify/require" "io" "sync/atomic" "testing" "time" + + "github.com/openziti/channel/v4" + "github.com/openziti/foundation/v2/sequencer" + "github.com/openziti/sdk-golang/ziti/edge" + "github.com/stretchr/testify/require" ) func BenchmarkConnWriteBaseLine(b *testing.B) { @@ -32,7 +33,7 @@ func BenchmarkConnWrite(b *testing.B) { closeNotify := make(chan struct{}) defer close(closeNotify) - mux := edge.NewMapMsgMux() + mux := edge.NewChannelConnMapMux[any]() testChannel := edge.NewSingleSdkChannel(&NoopTestChannel{}) conn := &edgeConn{ MsgChannel: *edge.NewEdgeMsgChannel(testChannel, 1), @@ -43,7 +44,7 @@ func BenchmarkConnWrite(b *testing.B) { req := require.New(b) - req.NoError(mux.AddMsgSink(conn)) + req.NoError(mux.Add(conn)) data := make([]byte, 1024) @@ -58,7 +59,7 @@ func BenchmarkConnRead(b *testing.B) { closeNotify := make(chan struct{}) defer close(closeNotify) - mux := edge.NewMapMsgMux() + mux := edge.NewChannelConnMapMux[any]() testChannel := edge.NewSingleSdkChannel(&NoopTestChannel{}) readQ := NewNoopSequencer[*channel.Message](closeNotify, 4) @@ -88,7 +89,7 @@ func BenchmarkConnRead(b *testing.B) { req := require.New(b) - req.NoError(mux.AddMsgSink(conn)) + req.NoError(mux.Add(conn)) data := make([]byte, 1024) @@ -135,7 +136,7 @@ func TestReadMultipart(t *testing.T) { closeNotify := make(chan struct{}) defer close(closeNotify) - mux := edge.NewMapMsgMux() + mux := edge.NewChannelConnMapMux[any]() testChannel := edge.NewSingleSdkChannel(&NoopTestChannel{}) readQ := NewNoopSequencer[*channel.Message](closeNotify, 4) diff --git a/ziti/edge/network/factory.go b/ziti/edge/network/factory.go index daa2a6c4..aaef4319 100644 --- a/ziti/edge/network/factory.go +++ b/ziti/edge/network/factory.go @@ -39,7 +39,7 @@ type routerConn struct { routerName string key string ch edge.SdkChannel - msgMux edge.MsgMux + mux edge.ConnMux[any] owner RouterConnOwner } @@ -66,7 +66,7 @@ func NewEdgeConnFactory(routerName, key string, owner RouterConnOwner) edge.Rout connFactory := &routerConn{ key: key, routerName: routerName, - msgMux: edge.NewMapMsgMux(), + mux: edge.NewChannelConnMapMux[any](), owner: owner, } @@ -81,33 +81,33 @@ func (conn *routerConn) BindChannel(binding channel.Binding) error { conn.ch = edge.NewSingleSdkChannel(binding.GetChannel()) } - binding.AddReceiveHandlerF(edge.ContentTypeDial, conn.msgMux.HandleReceive) - binding.AddReceiveHandlerF(edge.ContentTypeStateClosed, conn.msgMux.HandleReceive) - binding.AddReceiveHandlerF(edge.ContentTypeTraceRoute, conn.msgMux.HandleReceive) - binding.AddReceiveHandlerF(edge.ContentTypeConnInspectRequest, conn.msgMux.HandleReceive) - binding.AddReceiveHandlerF(edge.ContentTypeBindSuccess, conn.msgMux.HandleReceive) - binding.AddReceiveHandlerF(edge.ContentTypeXgPayload, conn.msgMux.HandleReceive) - binding.AddReceiveHandlerF(edge.ContentTypeXgAcknowledgement, conn.msgMux.HandleReceive) - binding.AddReceiveHandlerF(edge.ContentTypeXgControl, conn.msgMux.HandleReceive) - binding.AddReceiveHandlerF(edge.ContentTypeInspectRequest, conn.msgMux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeDial, conn.mux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeStateClosed, conn.mux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeTraceRoute, conn.mux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeConnInspectRequest, conn.mux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeBindSuccess, conn.mux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeXgPayload, conn.mux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeXgAcknowledgement, conn.mux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeXgControl, conn.mux.HandleReceive) + binding.AddReceiveHandlerF(edge.ContentTypeInspectRequest, conn.mux.HandleReceive) // Since data is the common message type, it gets to be dispatched directly - binding.AddTypedReceiveHandler(conn.msgMux) - binding.AddCloseHandler(conn.msgMux) + binding.AddTypedReceiveHandler(conn.mux) + binding.AddCloseHandler(conn.mux) binding.AddCloseHandler(conn) return nil } func (conn *routerConn) NewDialConn(service *rest_model.ServiceDetail) *edgeConn { - id := conn.msgMux.GetNextId() + id := conn.mux.GetNextId() closeNotify := make(chan struct{}) edgeCh := &edgeConn{ closeNotify: closeNotify, MsgChannel: *edge.NewEdgeMsgChannel(conn.ch, id), readQ: NewNoopSequencer[*channel.Message](closeNotify, 4), - msgMux: conn.msgMux, + msgMux: conn.mux, serviceName: *service.Name, marker: newMarker(), } @@ -121,7 +121,7 @@ func (conn *routerConn) NewDialConn(service *rest_model.ServiceDetail) *edgeConn } } - err = conn.msgMux.AddMsgSink(edgeCh) // duplicate errors only happen on the server side, since client controls ids + err = conn.mux.Add(edgeCh) // duplicate errors only happen on the server side, since client controls ids if err != nil { pfxlog.Logger().Warnf("error adding message sink %s[%d]: %v", *service.Name, id, err) } @@ -150,11 +150,11 @@ func (conn *routerConn) UpdateToken(token []byte, timeout time.Duration) error { } func (conn *routerConn) NewListenConn(service *rest_model.ServiceDetail, keyPair *kx.KeyPair) *edgeHostConn { - id := conn.msgMux.GetNextId() + id := conn.mux.GetNextId() edgeCh := &edgeHostConn{ MsgChannel: *edge.NewEdgeMsgChannel(conn.ch, id), - msgMux: conn.msgMux, + msgMux: conn.mux, serviceName: *service.Name, keyPair: keyPair, crypto: keyPair != nil, @@ -162,7 +162,7 @@ func (conn *routerConn) NewListenConn(service *rest_model.ServiceDetail, keyPair } // duplicate errors only happen on the server side, since the client controls ids - if err := conn.msgMux.AddMsgSink(edgeCh); err != nil { + if err := conn.mux.Add(edgeCh); err != nil { pfxlog.Logger().Warnf("error adding message sink %s[%d]: %v", *service.Name, id, err) } diff --git a/ziti/edge/network/hosting_conn.go b/ziti/edge/network/hosting_conn.go index a8b80515..ab66f718 100644 --- a/ziti/edge/network/hosting_conn.go +++ b/ziti/edge/network/hosting_conn.go @@ -19,6 +19,9 @@ package network import ( "encoding/json" "fmt" + "sync/atomic" + "time" + "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v4" "github.com/openziti/edge-api/rest_model" @@ -27,21 +30,109 @@ import ( "github.com/openziti/secretstream/kx" cmap "github.com/orcaman/concurrent-map/v2" "github.com/sirupsen/logrus" - "sync/atomic" - "time" ) +// edgeHostConn represents a service hosting connection that acts as a "receptionist" +// for incoming client dial requests. It implements edge.MsgSink to handle service-level +// messages and manages multiple client connections through its embedded ConnMux. +// +// Architecture: +// - Receives dial requests from clients wanting to connect to the hosted service +// - Creates individual edgeConn instances for each accepted client connection +// - Routes ongoing client messages directly to their respective edgeConn via msgMux +// - Manages service lifecycle (bind, unbind, close) with the edge router +// +// Message Flow: +// 1. Client sends dial request → edgeHostConn.Accept() handles it +// 2. edgeHostConn creates new edgeConn for the client +// 3. edgeConn is added to msgMux for future message routing +// 4. Client's data messages bypass edgeHostConn and go directly to edgeConn.Accept() +// +// Thread Safety: All methods are safe for concurrent use. type edgeHostConn struct { + // MsgChannel provides the underlying channel communication capabilities + // for sending messages back to the edge router (bind requests, state changes, etc.) edge.MsgChannel - msgMux edge.MsgMux - hosting cmap.ConcurrentMap[string, *edgeListener] - closed atomic.Bool + + // msgMux manages individual client connections created from dial requests. + // Each accepted client gets an edgeConn that is registered with this mux. + // Future messages for specific clients are routed directly to their edgeConn. + msgMux edge.ConnMux[any] + + // hosting maps session tokens to their corresponding edgeListener instances. + // Each token represents a service binding session with the edge router. + hosting cmap.ConcurrentMap[string, *edgeListener] + + // closed indicates whether this hosting connection has been terminated. + // Used to prevent new operations on a closed connection. + closed atomic.Bool + + // serviceName is the name of the service being hosted by this connection. + // Used for logging and debugging purposes. serviceName string - marker string - crypto bool - keyPair *kx.KeyPair + + // marker is a unique identifier for this hosting connection instance. + // Used for tracing and debugging across the distributed system. + marker string + + // crypto indicates whether end-to-end encryption is required for client connections. + // When true, client connections must establish encrypted sessions using keyPair. + crypto bool + + // keyPair contains the cryptographic keys used for end-to-end encryption + // when crypto is enabled. Used during client connection handshake. + keyPair *kx.KeyPair + + // data stores arbitrary service-level context information that can be + // accessed by the hosting application. This might include service configuration, + // authentication policies, metrics collectors, or other service-wide state. + // Unlike client-specific data in edgeConn, this context applies to the entire service. + data atomic.Value +} + +// GetData retrieves arbitrary service-level context data associated with this hosting connection. +// This allows hosting applications to store and retrieve service-wide configuration, +// state, or metadata that applies to all clients of this service. +// +// Returns: +// - any: the stored service context data, or nil if none has been set +// +// Examples of service-level data: +// - Service configuration and feature flags +// - Authentication and authorization policies +// - Metrics collectors or connection limits +// - Custom service handlers or middleware +func (conn *edgeHostConn) GetData() any { + return conn.data.Load() +} + +// SetData stores arbitrary service-level context data for this hosting connection. +// This data persists for the lifetime of the service and can be accessed during +// client connection handling or other service operations. +// +// Parameters: +// - data: arbitrary context data to associate with this service +// +// Thread Safety: This method is safe for concurrent use. +func (conn *edgeHostConn) SetData(data any) { + conn.data.Store(data) } +// Accept implements edge.MsgSink and handles service-level messages for this hosting connection. +// This method acts as the "receptionist" that processes incoming requests and manages +// the service lifecycle. It does NOT handle individual client data messages - those +// are routed directly to the appropriate edgeConn via msgMux. +// +// Handled Message Types: +// - ContentTypeDial: Creates new client connections for dial requests +// - ContentTypeStateClosed: Handles service shutdown notifications +// - ContentTypeBindSuccess: Confirms service binding and notifies listeners +// - ContentTypeConnInspectRequest: Provides service inspection data +// +// Parameters: +// - msg: the incoming message to process +// +// Thread Safety: This method is safe for concurrent use. func (conn *edgeHostConn) Accept(msg *channel.Message) { conn.TraceMsg("Accept", msg) @@ -158,7 +249,7 @@ func (conn *edgeHostConn) newChildConnection(message *channel.Message) { WithField("token", token). WithField("circuitId", circuitId) - err := conn.msgMux.AddMsgSink(edgeCh) // duplicate errors only happen on the server side, since client controls ids + err := conn.msgMux.Add(edgeCh) // duplicate errors only happen on the server side, since client controls ids if err != nil { conn.close(true) diff --git a/ziti/edge/network/xg_adapter.go b/ziti/edge/network/xg_adapter.go index fc020966..b6e5c49d 100644 --- a/ziti/edge/network/xg_adapter.go +++ b/ziti/edge/network/xg_adapter.go @@ -4,13 +4,14 @@ import ( "context" "errors" "fmt" + "time" + "github.com/michaelquigley/pfxlog" "github.com/openziti/channel/v4" "github.com/openziti/sdk-golang/edgexg" "github.com/openziti/sdk-golang/xgress" "github.com/openziti/sdk-golang/ziti/edge" "github.com/sirupsen/logrus" - "time" ) type XgAdapter struct { @@ -28,7 +29,7 @@ func (self *XgAdapter) HandleXgressClose(x *xgress.Xgress) { } // see note in close - self.conn.msgMux.RemoveMsgSink(self.conn) + self.conn.msgMux.Remove(self.conn) } func (self *XgAdapter) ForwardPayload(payload *xgress.Payload, _ *xgress.Xgress, ctx context.Context) { diff --git a/ziti/sdkinfo/build_info.go b/ziti/sdkinfo/build_info.go index 3f8c2c85..9d47ef1d 100644 --- a/ziti/sdkinfo/build_info.go +++ b/ziti/sdkinfo/build_info.go @@ -20,5 +20,5 @@ package sdkinfo const ( - Version = "v1.2.4" + Version = "v1.2.5" )