Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6025d2b
refactor: simplify NewConfig signature, add Configuration.Close
meling Mar 24, 2026
a5dc9cf
refactor: update all callers to use NewConfig
meling Mar 24, 2026
958fca1
chore: make NewConfig error message consistent with NewConfiguration
meling Mar 24, 2026
fd90518
chore: remove unused NodeIDs and Size methods from outboundManager
meling Mar 24, 2026
9615f6f
chore: remove deprecated Manager methods and aliases from configuration
meling Mar 24, 2026
6872ce2
chore: replace remaining Manager references with outboundManager
meling Mar 24, 2026
4e03b6e
doc: update ConfigContext and Configuration comments for clarity
meling Mar 24, 2026
98ea597
chore: clarify comments for owning manager in Node struct
meling Mar 24, 2026
ef4c963
chore: remove NewManager from TestOption docs
meling Mar 24, 2026
0a43a45
chore: rename TestNewConfiguration to TestNewConfig for consistency
meling Mar 24, 2026
4d373c4
refactor(testing): add TestDialOptions and unify getOrCreateManager
meling Mar 24, 2026
af123c3
refactor(test): replace TestManager+NewConfiguration with NewConfig
meling Mar 24, 2026
dadc7ee
refactor(test): replace withRequestHandler with WithServer
meling Mar 24, 2026
25d9124
chore: remove deprecated NewConfiguration function
meling Mar 24, 2026
85643ca
chore: remove unused TestManager function
meling Mar 24, 2026
ca0cf19
chore: update various mgr references no longer valid
meling Mar 24, 2026
07cde49
docs(user-guide): update client API examples to use NewConfig
meling Mar 24, 2026
d1e0494
test: update TestConfig to use 6 nodes and adjust configuration creation
meling Mar 24, 2026
d915c9c
refactor(testing): simplify TestConfiguration to use NewConfig directly
meling Mar 24, 2026
b90a431
refactor: reorganize Configuration and ConfigContext definitions
meling Mar 25, 2026
c3f9ae5
doc: rename variable 'cfg' to 'config' for consistency in user guide
meling Mar 25, 2026
1f56384
doc: update context in ExampleConfigClient to use Background context
meling Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 12 additions & 29 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,32 @@ import (
"slices"
)

// ConfigContext is a context that carries a configuration for quorum calls.
// It embeds context.Context and provides access to the Configuration.
// Configuration represents a static set of nodes on which multicast or
// quorum calls may be invoked. A configuration is created using [NewConfig].
// A configuration should be treated as immutable. Therefore, methods that
// operate on a configuration always return a new Configuration instance.
type Configuration []*Node

// ConfigContext is a context that carries a configuration for multicast or
// quorum calls. It embeds context.Context and provides access to the configuration.
//
// Use [Configuration.Context] to create a ConfigContext from an existing context.
type ConfigContext struct {
context.Context
cfg Configuration
}

// Configuration returns the Configuration associated with this context.
// Configuration returns the configuration associated with this context.
func (c ConfigContext) Configuration() Configuration {
return c.cfg
}

// Configuration represents a static set of nodes on which quorum calls may be invoked.
// A configuration is created using [NewConfiguration] or [NewConfig]. A configuration
// should be treated as immutable. Therefore, methods that operate on a configuration
// always return a new Configuration instance.
type Configuration []*Node

// Context creates a new ConfigContext from the given parent context
// and this configuration.
//
// Example:
//
// config, _ := gorums.NewConfiguration(mgr, gorums.WithNodeList(addrs))
// config, _ := gorums.NewConfig(gorums.WithNodeList(addrs), dialOpts...)
// cfgCtx := config.Context(context.Background())
// resp, err := paxos.Prepare(cfgCtx, req)
func (c Configuration) Context(parent context.Context) *ConfigContext {
Expand All @@ -42,14 +42,6 @@ func (c Configuration) Context(parent context.Context) *ConfigContext {
return &ConfigContext{Context: parent, cfg: c}
}

// Deprecated: Use [NewConfig] instead.
func NewConfiguration(mgr *Manager, opt NodeListOption) (nodes Configuration, err error) {
if opt == nil {
return nil, fmt.Errorf("config: missing required node list")
}
return opt.newConfig(mgr)
}

// NewConfig returns a new [Configuration] based on the provided nodes and dial options.
//
// Example:
Expand All @@ -60,10 +52,10 @@ func NewConfiguration(mgr *Manager, opt NodeListOption) (nodes Configuration, er
// )
func NewConfig(nodes NodeListOption, opts ...DialOption) (Configuration, error) {
if nodes == nil {
return nil, fmt.Errorf("gorums: missing required NodeListOption")
return nil, fmt.Errorf("config: missing required node list")
}
mgr := newOutboundManager(opts...)
cfg, err := NewConfiguration(mgr, nodes)
cfg, err := nodes.newConfig(mgr)
if err != nil {
_ = mgr.Close()
return nil, err
Expand All @@ -72,7 +64,6 @@ func NewConfig(nodes NodeListOption, opts ...DialOption) (Configuration, error)
}

// Extend returns a new Configuration combining c with new nodes from the provided NodeListOption.
// This is the only way to add nodes that are not yet registered with the manager.
func (c Configuration) Extend(opt NodeListOption) (Configuration, error) {
if len(c) == 0 {
return nil, fmt.Errorf("config: cannot extend empty configuration")
Expand Down Expand Up @@ -120,14 +111,6 @@ func (c Configuration) Equal(b Configuration) bool {
return true
}

// Manager returns the Manager that manages this configuration's nodes.
// Returns nil if the configuration is empty.
//
// Deprecated: Use [Configuration.Close] to close the configuration instead.
func (c Configuration) Manager() *Manager {
return c.mgr()
}

// mgr returns the outboundManager for this configuration's nodes.
func (c Configuration) mgr() *outboundManager {
if len(c) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (n testNode) Addr() string {
return n.addr
}

func TestNewConfiguration(t *testing.T) {
func TestNewConfig(t *testing.T) {
tests := []struct {
name string
opt gorums.NodeListOption
Expand Down
144 changes: 65 additions & 79 deletions doc/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,11 @@ func ExampleStorageServer(port int) {
## Implementing the StorageClient

Next, we write client code to call RPCs on our servers.
The first thing we need to do is to create an instance of the `Manager` type.
The manager maintains a pool of connections to nodes.
Nodes are added to the connection pool via new configurations, as shown below.
The first thing we need to do is to create a `Configuration` using `gorums.NewConfig`.
`NewConfig` establishes connections to the given nodes and returns a configuration
ready for making RPC calls.

The manager takes as arguments a set of optional manager options.
We can forward gRPC dial options to the manager if needed.
The manager will use these options when connecting to nodes.
We can forward gRPC dial options to `NewConfig` if needed.
Below we use only a simple insecure connection option.

```go
Expand All @@ -345,32 +343,28 @@ import (
)

func ExampleStorageClient() {
mgr := NewManager(
gorums.WithDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
)
```

A configuration is a set of nodes on which our RPC calls can be invoked.
Using the `WithNodeList` option, the manager assigns a unique identifier to each node.
The code below shows how to create a configuration:

```go
// Get all all available node ids, 3 nodes
addrs := []string{
"127.0.0.1:8080",
"127.0.0.1:8081",
"127.0.0.1:8082",
}
// Create a configuration including all nodes
allNodesConfig, err := NewConfiguration(mgr, gorums.WithNodeList(addrs))
allNodesConfig, err := gorums.NewConfig(
gorums.WithNodeList(addrs),
gorums.WithDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
)
if err != nil {
log.Fatalln("error creating read config:", err)
}
defer allNodesConfig.Close()
```

The `Manager` and `Configuration` types also have a few other available methods.
A configuration is a set of nodes on which RPC calls can be invoked.
`WithNodeList` assigns a unique identifier to each node by address.

The `Configuration` type has several useful methods for combining and filtering configurations.
Inspect the package documentation or source code for details.

We can now invoke the WriteUnicast RPC on each `node` in the configuration:
Expand Down Expand Up @@ -597,17 +591,17 @@ func ExampleStorageClient() {
"127.0.0.1:8082",
}

mgr := gorums.NewManager(
// Create a configuration with all nodes
config, err := gorums.NewConfig(
gorums.WithNodeList(addrs),
gorums.WithDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
)

// Create a configuration with all nodes
cfg, err := NewConfiguration(mgr, gorums.WithNodeList(addrs))
if err != nil {
log.Fatalln("error creating configuration:", err)
}
defer config.Close()

ctx := context.Background()
cfgCtx := config.Context(ctx)
Expand Down Expand Up @@ -1090,7 +1084,7 @@ Gorums defines several sentinel errors that commonly appear as the cause of a `Q
Here's how to properly handle errors from a quorum call:

```go
func handleQuorumCall(cfg *gorums.Configuration, req *ReadRequest) {
func handleQuorumCall(config *gorums.Configuration, req *ReadRequest) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand Down Expand Up @@ -1168,18 +1162,17 @@ if err != nil {
var qcErr gorums.QuorumCallError
if errors.As(err, &qcErr) {
// Option 1: Exclude all failed nodes
newConfig, err := NewConfiguration(mgr, config.WithoutErrors(qcErr))
newConfig := config.WithoutErrors(qcErr)

// Option 2: Exclude only nodes with specific error types
// For example, exclude only nodes that timed out
newConfig, err := NewConfiguration(mgr, config.WithoutErrors(qcErr, context.DeadlineExceeded))
newConfig = config.WithoutErrors(qcErr, context.DeadlineExceeded)

// Option 3: Exclude nodes with multiple specific error types
newConfig, err := NewConfiguration(mgr, config.WithoutErrors(qcErr,
context.DeadlineExceeded,
context.Canceled,
io.EOF,
),
newConfig = config.WithoutErrors(qcErr,
context.DeadlineExceeded,
context.Canceled,
io.EOF,
)

// Retry the operation with the new configuration
Expand All @@ -1196,7 +1189,7 @@ This allows you to filter nodes based on the underlying cause of their failures,

Below is an example demonstrating how to work with configurations.
These configurations are viewed from the client's perspective, and to actually make quorum calls on these configurations, there must be server endpoints to connect to.
We ignore the construction of `mgr` and error handling (except for the last configuration).
Error handling is omitted for brevity except where the result is used.

In the example below, we simply use fixed quorum sizes.

Expand All @@ -1207,56 +1200,49 @@ func ExampleConfigClient() {
"127.0.0.1:8081",
"127.0.0.1:8082",
}
// Make configuration c1 from addrs, giving |c1| = |addrs| = 3
c1, _ := NewConfiguration(mgr,
// Create base configuration c1 from addrs, giving |c1| = 3.
c1, err := gorums.NewConfig(
gorums.WithNodeList(addrs),
gorums.WithDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
)
if err != nil {
log.Fatalln("error creating configuration:", err)
}
defer c1.Close()

newAddrs := []string{
"127.0.0.1:9080",
"127.0.0.1:9081",
}
// Make configuration c2 from newAddrs, giving |c2| = |newAddrs| = 2
c2, _ := NewConfiguration(mgr,
gorums.WithNodeList(newAddrs),
)
// Extend c1 with newAddrs; c2 shares c1's connection pool, |c2| = |c1| + |newAddrs| = 5.
c2, _ := c1.Extend(gorums.WithNodeList(newAddrs))

// Make new configuration c3 from c1 and newAddrs, giving |c3| = |c1| + |newAddrs| = 3+2=5
c3, _ := NewConfiguration(mgr,
c1.WithNewNodes(gorums.WithNodeList(newAddrs)),
)
// c3 = nodes in c2 not in c1, giving |c3| = |newAddrs| = 2.
c3 := c2.Difference(c1)

// Make new configuration c4 from c1 and c2, giving |c4| = |c1| + |c2| = 3+2=5
c4, _ := NewConfiguration(mgr,
c1.And(c2),
)
// c4 = union of c1 and c3, giving |c4| = |c1| + |c3| = 3+2 = 5.
c4 := c1.Union(c3)

// Make new configuration c5 from c1 except the first node from c1, giving |c5| = |c1| - 1 = 3-1 = 2
c5, _ := NewConfiguration(mgr,
c1.WithoutNodes(c1.NodeIDs()[0]),
)
// c5 = c1 without its first node, giving |c5| = |c1| - 1 = 2.
c5 := c1.Remove(c1.NodeIDs()[0])

// Make new configuration c6 from c3 except c1, giving |c6| = |c3| - |c1| = 5-3 = 2
c6, _ := NewConfiguration(mgr,
c3.Except(c1),
)
// c6 = c2 without c1, giving |c6| = |c2| - |c1| = 5-3 = 2.
c6 := c2.Difference(c1)

// Example: Handling quorum call failures and creating a new configuration
// without failed nodes
cfgCtx := c1.Context(ctx)
// without failed nodes.
cfgCtx := c1.Context(context.Background())
state, err := ReadQC(cfgCtx, &ReadRequest{}).Majority()
if err != nil {
var qcErr gorums.QuorumCallError
if errors.As(err, &qcErr) {
// Create a new configuration excluding all nodes that failed
c7, _ := NewConfiguration(mgr,
c1.WithoutErrors(qcErr),
)

// Or exclude only nodes with specific error types (e.g., timeout errors)
c8, _ := NewConfiguration(mgr,
c1.WithoutErrors(qcErr, context.DeadlineExceeded),
)
// Create a new configuration excluding all nodes that failed.
c7 := c1.WithoutErrors(qcErr)

// Or exclude only nodes with specific error types (e.g., timeout errors).
c8 := c1.WithoutErrors(qcErr, context.DeadlineExceeded)
}
}
}
Expand Down Expand Up @@ -1394,28 +1380,28 @@ Without `Release()`, the server would block all other inbound messages until the
// ReadNestedQC is a quorum-call handler that fans out a nested ReadQC
// to all known connected peers and returns the most recent value.
func (s *storageServer) ReadNestedQC(ctx gorums.ServerCtx, req *pb.ReadRequest) (*pb.ReadResponse, error) {
cfg := ctx.Config()
if len(cfg) == 0 {
config := ctx.Config()
if len(config) == 0 {
return nil, fmt.Errorf("ReadNestedQC requires a server peer configuration")
}
// Release the handler lock before making nested outbound calls to avoid
// blocking inbound message processing on this server.
ctx.Release()
return newestValue(pb.ReadQC(cfg.Context(ctx), req))
return newestValue(pb.ReadQC(config.Context(ctx), req))
}
```

The same pattern applies to nested multicast:

```go
func (s *storageServer) WriteNestedMulticast(ctx gorums.ServerCtx, req *pb.WriteRequest) (*pb.WriteResponse, error) {
cfg := ctx.Config()
if len(cfg) == 0 {
return pb.WriteResponse_builder{New: false}.Build(), fmt.Errorf("WriteNestedMulticast requires a server peer configuration")
config := ctx.Config()
if len(config) == 0 {
return nil, fmt.Errorf("write_nested_multicast: requires server peer configuration")
}
ctx.Release()
if err := pb.WriteMulticast(cfg.Context(ctx), req); err != nil {
return pb.WriteResponse_builder{New: false}.Build(), err
if err := pb.WriteMulticast(config.Context(ctx), req); err != nil {
return nil, fmt.Errorf("write_nested_multicast: %w", err)
}
return pb.WriteResponse_builder{New: true}.Build(), nil
}
Expand Down Expand Up @@ -1499,7 +1485,7 @@ clientSrv := gorums.NewServer()
clientSrv.RegisterHandler(pb.MyMethod, myHandler)

// Connect to the server; NewConfig wires up the back-channel dispatcher automatically.
cfg, err := clientSrv.NewConfig(
config, err := clientSrv.NewConfig(
gorums.WithNodeList(serverAddrs),
gorums.WithDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())),
)
Expand Down Expand Up @@ -1532,12 +1518,12 @@ The handler reads `ctx.ClientConfig()` to reach all currently connected client p
```go
// ReadNestedQC fans out a ReadQC to all clients that have connected.
func (s *storageServer) ReadNestedQC(ctx gorums.ServerCtx, req *pb.ReadRequest) (*pb.ReadResponse, error) {
cfg := ctx.ClientConfig()
if len(cfg) == 0 {
config := ctx.ClientConfig()
if len(config) == 0 {
return nil, fmt.Errorf("ReadNestedQC: no client peers connected")
}
ctx.Release()
return newestValue(pb.ReadQC(cfg.Context(ctx), req))
return newestValue(pb.ReadQC(config.Context(ctx), req))
}
```

Expand Down
4 changes: 2 additions & 2 deletions examples/storage/repl.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func (r repl) parseConfiguration(cfgStr string) (pb.Configuration, error) {
}

nodes := make([]*pb.Node, 0, len(indices))
mgrNodes := r.cfg.Nodes()
cfgNodes := r.cfg.Nodes()
for _, i := range indices {
nodes = append(nodes, mgrNodes[i])
nodes = append(nodes, cfgNodes[i])
}
gorums.OrderedBy(gorums.ID).Sort(nodes)
return pb.Configuration(nodes), nil
Expand Down
Loading
Loading