Skip to content

feat: Symmetric Configuration — Phase 2: InboundManager and Auto-Configuration #274

@meling

Description

@meling

Symmetric Configuration — Phase 2: InboundManager and Auto-Configuration

Parent: symmetric-configuration-design.md


Goal

Enable the server to track connected peers and provide a Configuration.
The InboundManager is a separate struct for clean decoupling from Server.


Peer Identity

Challenge

Using peer.FromContext(ctx) gives the client's ephemeral port, not its listening address:

Client listening on:     192.168.1.10:8080
Client connects from:    192.168.1.10:54321  ← ephemeral port
Server sees peer.Addr:   192.168.1.10:54321  ← cannot directly map to NodeID

Solution

The client includes its server listening address in the connection metadata.
Both the metadata key constant and the extraction logic live in one file (client_identity.go).

client_identity.go

// client_identity.go — peer identity for symmetric configurations.
// All gorums-addr metadata handling is co-located here.
package gorums

const gorumsAddrKey = "gorums-addr"

// identifyPeer extracts and validates the claimed server address from the
// stream context. It returns the validated address, or "" for external
// (non-replica) clients that do not send the gorums-addr metadata.
//
// The validation ensures that the peer's actual IP matches the IP in the
// claimed address, preventing address spoofing.
func identifyPeer(ctx context.Context) (claimedAddr string, err error) {
    // Get peer info for IP validation
    p, ok := peer.FromContext(ctx)
    if !ok {
        return "", nil
    }
    peerHost, _, _ := net.SplitHostPort(p.Addr.String())

    // Get claimed address from metadata
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return "", nil
    }
    addrs := md.Get(gorumsAddrKey)
    if len(addrs) == 0 {
        return "", nil // External client (non-replica)
    }
    claimedAddr = addrs[0]
    claimedHost, _, _ := net.SplitHostPort(claimedAddr)

    // Validate peer IP matches claimed address
    if peerHost != claimedHost {
        return "", fmt.Errorf("peer address mismatch: peer=%s, claimed=%s", peerHost, claimedHost)
    }
    return claimedAddr, nil
}

identifyPeer is stateless — it only extracts and validates the address.
The ID lookup is performed separately by the InboundManager, which holds the address → ID mapping.
This separation keeps identifyPeer testable as a pure function.

Benefits:

  • No extra bytes on every message (metadata only sent once at connection time)
  • Security validation via peer IP matching
  • External clients naturally get ID = 0
  • Address validation and ID lookup are independent concerns

InboundNodeOption Interface

Rather than modifying NodeListOption, we define a separate interface for server-side node configuration.
This avoids changing the existing NodeListOption contract while reusing the same normalizeAddr validation from config_opts.go.

// In config_opts.go

// InboundNodeOption configures which peers an InboundManager will accept.
// Only WithNodes (which provides explicit IDs) satisfies this interface;
// WithNodeList does not, since inbound tracking requires pre-assigned IDs.
type InboundNodeOption interface {
    Option
    newServerConfig(im *InboundManager) error
}

The nodeMap[T] type (returned by WithNodes) implements InboundNodeOption:

func (nm nodeMap[T]) newServerConfig(im *InboundManager) error {
    for id, node := range nm {
        if id == 0 {
            return fmt.Errorf("config: node 0 is reserved for external clients")
        }
        addr, err := normalizeAddr(node.Addr())
        if err != nil {
            return fmt.Errorf("config: invalid address %q: %w", node.Addr(), err)
        }
        if existingID, exists := im.knownNodes[addr]; exists {
            return fmt.Errorf("config: address %q already in use by node %d", addr, existingID)
        }
        im.knownNodes[addr] = id
    }
    return nil
}

The nodeList type (returned by WithNodeList) does not implement InboundNodeOption.
Attempting to pass WithNodeList(...) to NewInboundManager is a compile-time error.

This design:

  • Keeps NodeListOption unchanged — no new methods
  • Reuses normalizeAddr and address validation patterns from config_opts.go
  • Provides compile-time safety: only WithNodes (with explicit IDs) can configure inbound peers

InboundManager Struct

// In a new file: inbound_manager.go

// InboundManager manages server-side awareness of connected peers.
// It automatically creates inbound Nodes from server streams and
// maintains a live Configuration that can be used for server-initiated
// quorum calls, multicasts, and other call types.
//
// InboundManager is safe for concurrent use.
type InboundManager struct {
    mu         sync.RWMutex
    myID       uint32             // this server's own NodeID
    peers      map[uint32]*Node   // connected peers by NodeID
    inboundCfg Configuration      // auto-updated, sorted by ID
    knownNodes map[string]uint32  // normalized address → NodeID lookup
    nextMsgID  uint64             // atomic counter for server-initiated messages
    sendBuffer uint               // buffer size for inbound channels

    // Optional callback invoked when the inbound configuration changes.
    onConfigChange func(Configuration)
}

// NewInboundManager creates a new InboundManager.
// The myID parameter is this server's own NodeID.
// The opt parameter defines the expected set of peers using WithNodes.
func NewInboundManager(myID uint32, opt InboundNodeOption) (*InboundManager, error) {
    im := &InboundManager{
        myID:       myID,
        peers:      make(map[uint32]*Node),
        knownNodes: make(map[string]uint32),
    }
    if err := opt.newServerConfig(im); err != nil {
        return nil, err
    }
    return im, nil
}

// InboundConfig returns a Configuration of all currently connected peers.
// The configuration is automatically updated as peers connect/disconnect.
// It can be used with QuorumCall, Multicast, etc.
func (im *InboundManager) InboundConfig() Configuration {
    im.mu.RLock()
    defer im.mu.RUnlock()
    return im.inboundCfg
}

// getMsgID returns the next message ID for server-initiated calls.
func (im *InboundManager) getMsgID() uint64 {
    return atomic.AddUint64(&im.nextMsgID, 1)
}

// lookupNodeID returns the NodeID for the given address, if known.
func (im *InboundManager) lookupNodeID(addr string) (uint32, bool) {
    im.mu.RLock()
    defer im.mu.RUnlock()
    normalizedAddr, err := normalizeAddr(addr)
    if err != nil {
        return 0, false
    }
    id, ok := im.knownNodes[normalizedAddr]
    return id, ok
}

Peer Registration

registerPeer returns an unregister function for use with defer:

func (im *InboundManager) registerPeer(id uint32, addr string, strm stream.BidiStream, streamCtx context.Context) func() {
    im.mu.Lock()
    defer im.mu.Unlock()

    node := newInboundNode(id, addr, strm, streamCtx, nodeOptions{
        SendBufferSize: im.sendBuffer,
        MsgIDGen:       im.getMsgID,
    })
    im.peers[id] = node
    im.rebuildConfig()

    return func() {
        im.mu.Lock()
        defer im.mu.Unlock()
        if n, ok := im.peers[id]; ok {
            n.close()
            delete(im.peers, id)
            im.rebuildConfig()
        }
    }
}

func (im *InboundManager) rebuildConfig() {
    cfg := make(Configuration, 0, len(im.peers))
    for _, node := range im.peers {
        cfg = append(cfg, node)
    }
    OrderedBy(ID).Sort(cfg)
    im.inboundCfg = cfg
    if im.onConfigChange != nil {
        im.onConfigChange(im.inboundCfg)
    }
}

handleStream — Single Entry Point

The NodeStream handler should not contain peer identification logic inline.
Instead, InboundManager exposes a single handleStream method that encapsulates the entire identify → lookup → register flow:

// handleStream identifies a connecting peer and registers it if recognized.
// Returns a cleanup function that must be deferred, or nil if the peer
// is not a recognized replica (external client or unknown address).
func (im *InboundManager) handleStream(ctx context.Context, srv stream.BidiStream) (cleanup func(), err error) {
    claimedAddr, err := identifyPeer(ctx)
    if err != nil {
        return nil, err
    }
    if claimedAddr == "" {
        return nil, nil // External client
    }
    peerID, ok := im.lookupNodeID(claimedAddr)
    if !ok || peerID == 0 {
        return nil, nil // Unknown peer
    }
    unregister := im.registerPeer(peerID, claimedAddr, srv, ctx)
    return unregister, nil
}

The NodeStream handler becomes:

func (s *streamServer) NodeStream(srv stream.Gorums_NodeStreamServer) error {
    ctx := srv.Context()

    if s.inbound != nil {
        cleanup, err := s.inbound.handleStream(ctx, srv)
        if err != nil {
            return err
        }
        if cleanup != nil {
            defer cleanup()
        }
    }

    if s.opts.connectCallback != nil {
        s.opts.connectCallback(ctx)
    }

    // ... existing handler logic (unchanged) ...
}

newInboundNode

// In node.go

// newInboundNode creates a Node backed by an inbound server stream.
// Used by InboundManager to represent connected peers.
func newInboundNode(id uint32, addr string, strm stream.BidiStream, streamCtx context.Context, opts nodeOptions) *Node {
    return &Node{
        id:       id,
        addr:     addr,
        channel:  stream.NewInboundChannel(streamCtx, id, strm, opts.SendBufferSize),
        msgIDGen: opts.MsgIDGen,
    }
}

Integration with Server

The Server optionally holds an *InboundManager:

// In server.go

type Server struct {
    srv          *streamServer
    grpcServer   *grpc.Server
    interceptors []Interceptor
    inbound      *InboundManager // nil if symmetric mode not enabled
}

// WithInboundManager attaches an InboundManager to the server.
func WithInboundManager(im *InboundManager) ServerOption {
    return func(o *serverOptions) {
        o.inbound = im
    }
}

Symmetric Stream Tiebreaker

When both replicas A and B connect to each other, each server receives an inbound stream from the other. Meanwhile, each client has an outbound stream. This results in two streams per pair. The tiebreaker ensures only one stream is used:

func (im *InboundManager) registerPeer(id uint32, addr string, strm stream.BidiStream, streamCtx context.Context) func() {
    im.mu.Lock()
    defer im.mu.Unlock()

    if existing, ok := im.peers[id]; ok {
        // Deterministic tiebreaker: lower NodeID keeps its outbound stream
        if im.myID < id {
            return func() {} // Keep our outbound connection; reject this inbound
        }
        existing.close() // Close our outbound; use their inbound
    }

    node := newInboundNode(id, addr, strm, streamCtx, nodeOptions{
        SendBufferSize: im.sendBuffer,
        MsgIDGen:       im.getMsgID,
    })
    im.peers[id] = node
    im.rebuildConfig()

    return func() {
        im.mu.Lock()
        defer im.mu.Unlock()
        if n, ok := im.peers[id]; ok {
            n.close()
            delete(im.peers, id)
            im.rebuildConfig()
        }
    }
}

Verification

Unit Tests

Test Case Description
TestInboundManagerRegisterUnregister Verify peers registered/unregistered correctly
TestInboundManagerConfig Verify configuration updates on connect/disconnect
TestIdentifyPeer Verify peer IP matches claimed address
TestIdentifyPeerExternal Verify external clients return empty address
TestInboundNodeOption Verify WithNodes satisfies InboundNodeOption
TestInboundNodeOptionValidation Verify address normalization and ID validation
TestHandleStream Verify single-method stream handling flow
TestHandleStreamReturnsCleanup Verify cleanup function unregisters peer
TestSymmetricStreamTiebreaker Verify deterministic stream selection

Commands

# Run gorums package tests
go test -v -timeout=15s ./...

# Run specific inbound manager tests
go test -v -run TestInboundManager ./...
go test -v -run TestIdentifyPeer ./...
go test -v -run TestHandleStream ./...

Acceptance Criteria

  • InboundNodeOption interface defined in config_opts.go
  • nodeMap[T] implements both NodeListOption and InboundNodeOption
  • nodeList does NOT implement InboundNodeOption (compile-time check)
  • InboundManager tracks peers and provides live Configuration
  • registerPeer returns an unregister cleanup function
  • handleStream encapsulates identify → lookup → register flow
  • identifyPeer is stateless and validates peer IP against claimed address
  • client_identity.go contains gorumsAddrKey and identifyPeer
  • All existing tests pass

Metadata

Metadata

Assignees

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions