Skip to content

Commit

Permalink
Merge 5ec9935 into 73f2531
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Feb 18, 2021
2 parents 73f2531 + 5ec9935 commit a3b1ac1
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 239 deletions.
37 changes: 34 additions & 3 deletions server/clustering.go
@@ -1,4 +1,4 @@
// Copyright 2017-2020 The NATS Authors
// Copyright 2017-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -20,6 +20,7 @@ import (
"io"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -70,6 +71,20 @@ type ClusteringOptions struct {
Sync bool // Do a file sync after every write to the Raft log and message store.
RaftLogging bool // Enable logging of Raft library (disabled by default since really verbose).

// Enable creation of dedicated NATS connections to communicate with other
// nodes. Normally, the server has a single NATS connection and subscribes
// to a subject where other nodes can submit requests to "connect" to it.
// When a remote connects, a new subscription on an inbox is created on
// both sides and they use their single "raft" NATS connection to communicate.
// If node "A" connects to both "B" and "C" it will have two subscriptions
// and two "outbox" subjects (on per remote node) to which send data to.
//
// With this option enabled, NATS connection(s) will be created per remote
// node. This should help with performance and reduce contention.
// The RAFT transport is pooling connections, so there may be more than
// one connection per remote node.
NodesConnections bool

// If this is enabled, the leader of the cluster will listen to add/remove
// requests on NATS subject "_STAN.raft.<cluster ID>.node.[add|remove]".
// Admin can/should limit permissions to send to this subject to prevent
Expand Down Expand Up @@ -379,8 +394,11 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
return false, err
}

// TODO: using a single NATS conn for every channel might be a bottleneck. Maybe pool conns?
transport, err := newNATSTransport(addr, s.ncr, tportTimeout, logWriter)
var makeConn natsRaftConnCreator
if s.opts.Clustering.NodesConnections {
makeConn = s.createNewRaftNATSConn
}
transport, err := newNATSTransport(addr, s.ncr, tportTimeout, logWriter, makeConn)
if err != nil {
store.Close()
return false, err
Expand Down Expand Up @@ -473,6 +491,13 @@ func (s *StanServer) createRaftNode(name string) (bool, error) {
return existingState, nil
}

func (s *StanServer) createNewRaftNATSConn(name string) (*nats.Conn, error) {
remoteNodeID := strings.TrimPrefix(name, s.opts.ID+".")
remoteNodeID = strings.TrimSuffix(remoteNodeID, "."+s.opts.ID)
conn, err := s.createNatsClientConn(s.opts.Clustering.NodeID + "-to-" + remoteNodeID)
return conn, err
}

// bootstrapCluster bootstraps the node for the provided Raft group either as a
// seed node or with the given peer configuration, depending on configuration
// and with the latter taking precedence.
Expand Down Expand Up @@ -502,10 +527,16 @@ func (s *StanServer) bootstrapCluster(name string, node *raft.Raft) error {
return node.BootstrapCluster(config).Error()
}

// This is bad because we have something like: "test-cluster.a.test-cluster",
// unfortunately, we can't change now without breaking backward compatibility,
// because new/old servers would not be able to connect to each other, since
// this is used for the subscription's subject to accept/send requests between
// nodes.
func (s *StanServer) getClusteringAddr(raftName string) string {
return s.getClusteringPeerAddr(raftName, s.opts.Clustering.NodeID)
}

// See comment above...
func (s *StanServer) getClusteringPeerAddr(raftName, nodeID string) string {
return fmt.Sprintf("%s.%s.%s", s.opts.ID, nodeID, raftName)
}
Expand Down
3 changes: 2 additions & 1 deletion server/clustering_test.go
@@ -1,4 +1,4 @@
// Copyright 2017-2020 The NATS Authors
// Copyright 2017-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -106,6 +106,7 @@ func getTestDefaultOptsForClustering(id string, bootstrap bool) *Options {
opts.Clustering.LogCacheSize = DefaultLogCacheSize
opts.Clustering.LogSnapshots = 1
opts.Clustering.RaftLogging = true
opts.Clustering.NodesConnections = true
opts.NATSServerURL = "nats://127.0.0.1:4222"
return opts
}
Expand Down
7 changes: 6 additions & 1 deletion server/conf.go
@@ -1,4 +1,4 @@
// Copyright 2016-2019 The NATS Authors
// Copyright 2016-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -361,6 +361,11 @@ func parseCluster(itf interface{}, opts *Options) error {
return err
}
opts.Clustering.BoltFreeListArray = v.(bool)
case "nodes_connections":
if err := checkType(k, reflect.Bool, v); err != nil {
return err
}
opts.Clustering.NodesConnections = v.(bool)
}
}
return nil
Expand Down
5 changes: 4 additions & 1 deletion server/conf_test.go
@@ -1,4 +1,4 @@
// Copyright 2016-2019 The NATS Authors
// Copyright 2016-2021 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -281,6 +281,9 @@ func TestParseConfig(t *testing.T) {
if !opts.Clustering.BoltFreeListArray {
t.Fatal("Expected BoltFreeListArray to be true")
}
if !opts.Clustering.NodesConnections {
t.Fatal("Expected NodesConnections to be true")
}
if opts.SQLStoreOpts.Driver != "mysql" {
t.Fatalf("Expected SQL Driver to be %q, got %q", "mysql", opts.SQLStoreOpts.Driver)
}
Expand Down

0 comments on commit a3b1ac1

Please sign in to comment.