diff --git a/server/clustering.go b/server/clustering.go index 25fdc27e..aae5000a 100644 --- a/server/clustering.go +++ b/server/clustering.go @@ -1,4 +1,4 @@ -// Copyright 2017-2020 The NATS Authors +// Copyright 2017-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -20,6 +20,7 @@ import ( "io" "os" "path/filepath" + "strings" "sync" "sync/atomic" "time" @@ -70,6 +71,20 @@ type ClusteringOptions struct { Sync bool // Do a file sync after every write to the Raft log and message store. RaftLogging bool // Enable logging of Raft library (disabled by default since really verbose). + // Enable creation of dedicated NATS connections to communicate with other + // nodes. Normally, the server has a single NATS connection and subscribes + // to a subject where other nodes can submit requests to "connect" to it. + // When a remote connects, a new subscription on an inbox is created on + // both sides and they use their single "raft" NATS connection to communicate. + // If node "A" connects to both "B" and "C" it will have two subscriptions + // and two "outbox" subjects (on per remote node) to which send data to. + // + // With this option enabled, NATS connection(s) will be created per remote + // node. This should help with performance and reduce contention. + // The RAFT transport is pooling connections, so there may be more than + // one connection per remote node. + NodesConnections bool + // If this is enabled, the leader of the cluster will listen to add/remove // requests on NATS subject "_STAN.raft..node.[add|remove]". // Admin can/should limit permissions to send to this subject to prevent @@ -379,8 +394,11 @@ func (s *StanServer) createRaftNode(name string) (bool, error) { return false, err } - // TODO: using a single NATS conn for every channel might be a bottleneck. Maybe pool conns? - transport, err := newNATSTransport(addr, s.ncr, tportTimeout, logWriter) + var makeConn natsRaftConnCreator + if s.opts.Clustering.NodesConnections { + makeConn = s.createNewRaftNATSConn + } + transport, err := newNATSTransport(addr, s.ncr, tportTimeout, logWriter, makeConn) if err != nil { store.Close() return false, err @@ -473,6 +491,13 @@ func (s *StanServer) createRaftNode(name string) (bool, error) { return existingState, nil } +func (s *StanServer) createNewRaftNATSConn(name string) (*nats.Conn, error) { + remoteNodeID := strings.TrimPrefix(name, s.opts.ID+".") + remoteNodeID = strings.TrimSuffix(remoteNodeID, "."+s.opts.ID) + conn, err := s.createNatsClientConn(s.opts.Clustering.NodeID + "-to-" + remoteNodeID) + return conn, err +} + // bootstrapCluster bootstraps the node for the provided Raft group either as a // seed node or with the given peer configuration, depending on configuration // and with the latter taking precedence. @@ -502,10 +527,16 @@ func (s *StanServer) bootstrapCluster(name string, node *raft.Raft) error { return node.BootstrapCluster(config).Error() } +// This is bad because we have something like: "test-cluster.a.test-cluster", +// unfortunately, we can't change now without breaking backward compatibility, +// because new/old servers would not be able to connect to each other, since +// this is used for the subscription's subject to accept/send requests between +// nodes. func (s *StanServer) getClusteringAddr(raftName string) string { return s.getClusteringPeerAddr(raftName, s.opts.Clustering.NodeID) } +// See comment above... func (s *StanServer) getClusteringPeerAddr(raftName, nodeID string) string { return fmt.Sprintf("%s.%s.%s", s.opts.ID, nodeID, raftName) } diff --git a/server/clustering_test.go b/server/clustering_test.go index 95963c1a..c4d4fb42 100644 --- a/server/clustering_test.go +++ b/server/clustering_test.go @@ -1,4 +1,4 @@ -// Copyright 2017-2020 The NATS Authors +// Copyright 2017-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -106,6 +106,7 @@ func getTestDefaultOptsForClustering(id string, bootstrap bool) *Options { opts.Clustering.LogCacheSize = DefaultLogCacheSize opts.Clustering.LogSnapshots = 1 opts.Clustering.RaftLogging = true + opts.Clustering.NodesConnections = true opts.NATSServerURL = "nats://127.0.0.1:4222" return opts } diff --git a/server/conf.go b/server/conf.go index e907ef11..8fe7cec4 100644 --- a/server/conf.go +++ b/server/conf.go @@ -1,4 +1,4 @@ -// Copyright 2016-2019 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -361,6 +361,11 @@ func parseCluster(itf interface{}, opts *Options) error { return err } opts.Clustering.BoltFreeListArray = v.(bool) + case "nodes_connections": + if err := checkType(k, reflect.Bool, v); err != nil { + return err + } + opts.Clustering.NodesConnections = v.(bool) } } return nil diff --git a/server/conf_test.go b/server/conf_test.go index 105d31d2..0e81d9a2 100644 --- a/server/conf_test.go +++ b/server/conf_test.go @@ -1,4 +1,4 @@ -// Copyright 2016-2019 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -281,6 +281,9 @@ func TestParseConfig(t *testing.T) { if !opts.Clustering.BoltFreeListArray { t.Fatal("Expected BoltFreeListArray to be true") } + if !opts.Clustering.NodesConnections { + t.Fatal("Expected NodesConnections to be true") + } if opts.SQLStoreOpts.Driver != "mysql" { t.Fatalf("Expected SQL Driver to be %q, got %q", "mysql", opts.SQLStoreOpts.Driver) } diff --git a/server/raft_transport.go b/server/raft_transport.go index 12a0468a..a01aaefd 100644 --- a/server/raft_transport.go +++ b/server/raft_transport.go @@ -1,4 +1,4 @@ -// Copyright 2017-2019 The NATS Authors +// Copyright 2017-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -38,6 +38,8 @@ const ( natsLogAppName = "raft-nats" ) +type natsRaftConnCreator func(name string) (*nats.Conn, error) + // natsAddr implements the net.Addr interface. An address for the NATS // transport is simply a node id, which is then used to construct an inbox. type natsAddr string @@ -63,26 +65,57 @@ type connectResponseProto struct { // connection between two peers. It does this by establishing a unique inbox at // each endpoint which the peers use to stream data to each other. type natsConn struct { + mu sync.RWMutex conn *nats.Conn + streamConn bool localAddr natsAddr remoteAddr natsAddr sub *nats.Subscription + subTimeout time.Duration + pending []byte outbox string - mu sync.RWMutex closed bool - reader *timeoutReader - writer io.WriteCloser parent *natsStreamLayer } func (n *natsConn) Read(b []byte) (int, error) { n.mu.RLock() closed := n.closed + subTimeout := n.subTimeout + if subTimeout == 0 { + subTimeout = time.Duration(0x7FFFFFFFFFFFFFFF) + } n.mu.RUnlock() if closed { - return 0, errors.New("read from closed conn") + return 0, io.EOF + } + buf := n.pending + if size := len(buf); size > 0 { + nb := copy(b, buf[:len(b)]) + if nb != size { + buf = buf[nb:] + } else { + buf = nil + } + n.pending = buf + return nb, nil + } + msg, err := n.sub.NextMsg(subTimeout) + if err != nil { + return 0, err + } + if len(msg.Data) == 0 { + n.close(false) + return 0, io.EOF + } + buf = msg.Data + if nb := len(buf); nb <= len(b) { + copy(b, buf) + return nb, nil } - return n.reader.Read(b) + nb := copy(b, buf[:len(b)]) + n.pending = buf[nb:] + return nb, nil } func (n *natsConn) Write(b []byte) (int, error) { @@ -90,7 +123,7 @@ func (n *natsConn) Write(b []byte) (int, error) { closed := n.closed n.mu.RUnlock() if closed { - return 0, errors.New("write to closed conn") + return 0, io.EOF } if len(b) == 0 { @@ -115,16 +148,11 @@ func (n *natsConn) Close() error { func (n *natsConn) close(signalRemote bool) error { n.mu.Lock() - defer n.mu.Unlock() - if n.closed { + n.mu.Unlock() return nil } - if err := n.sub.Unsubscribe(); err != nil { - return err - } - if signalRemote { // Send empty message to signal EOF for a graceful disconnect. Not // concerned with errors here as this is best effort. @@ -133,13 +161,27 @@ func (n *natsConn) close(signalRemote bool) error { n.conn.FlushTimeout(500 * time.Millisecond) } + // If connection is owned by stream, simply unsubscribe. Note that we + // check for sub != nil because this can be called during setup where + // sub has not been attached. + var err error + if n.streamConn { + if n.sub != nil { + err = n.sub.Unsubscribe() + } + } else { + n.conn.Close() + } + n.closed = true - n.parent.mu.Lock() - delete(n.parent.conns, n) - n.parent.mu.Unlock() - n.writer.Close() + stream := n.parent + n.mu.Unlock() - return nil + stream.mu.Lock() + delete(stream.conns, n) + stream.mu.Unlock() + + return err } func (n *natsConn) LocalAddr() net.Addr { @@ -151,34 +193,28 @@ func (n *natsConn) RemoteAddr() net.Addr { } func (n *natsConn) SetDeadline(t time.Time) error { - if err := n.SetReadDeadline(t); err != nil { - return err + n.mu.Lock() + if t.IsZero() { + n.subTimeout = 0 + } else { + n.subTimeout = time.Until(t) } - return n.SetWriteDeadline(t) + n.mu.Unlock() + return nil } func (n *natsConn) SetReadDeadline(t time.Time) error { - n.reader.SetDeadline(t) - return nil + return n.SetDeadline(t) } func (n *natsConn) SetWriteDeadline(t time.Time) error { - return nil -} - -func (n *natsConn) msgHandler(msg *nats.Msg) { - // Check if remote peer disconnected. - if len(msg.Data) == 0 { - n.close(false) - return - } - - n.writer.Write(msg.Data) + return n.SetDeadline(t) } // natsStreamLayer implements the raft.StreamLayer interface. type natsStreamLayer struct { conn *nats.Conn + makeConn natsRaftConnCreator localAddr natsAddr sub *nats.Subscription logger hclog.Logger @@ -189,10 +225,11 @@ type natsStreamLayer struct { dfTimeout time.Duration } -func newNATSStreamLayer(id string, conn *nats.Conn, logger hclog.Logger, timeout time.Duration) (*natsStreamLayer, error) { +func newNATSStreamLayer(id string, conn *nats.Conn, logger hclog.Logger, timeout time.Duration, makeConn natsRaftConnCreator) (*natsStreamLayer, error) { n := &natsStreamLayer{ localAddr: natsAddr(id), conn: conn, + makeConn: makeConn, logger: logger, conns: map[*natsConn]struct{}{}, dfTimeout: timeoutForDialAndFlush, @@ -214,17 +251,26 @@ func newNATSStreamLayer(id string, conn *nats.Conn, logger hclog.Logger, timeout return n, nil } -func (n *natsStreamLayer) newNATSConn(address string) *natsConn { - // TODO: probably want a buffered pipe. - reader, writer := io.Pipe() - return &natsConn{ - conn: n.conn, +func (n *natsStreamLayer) newNATSConn(address string) (*natsConn, error) { + var conn *nats.Conn + var err error + + c := &natsConn{ localAddr: n.localAddr, remoteAddr: natsAddr(address), - reader: newTimeoutReader(reader), - writer: writer, parent: n, } + if n.makeConn == nil { + c.conn = n.conn + c.streamConn = true + } else { + conn, err = n.makeConn(address) + if err != nil { + return nil, err + } + c.conn = conn + } + return c, nil } // Dial creates a new net.Conn with the remote address. This is implemented by @@ -235,12 +281,6 @@ func (n *natsStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration return nil, errors.New("raft-nats: dial failed, not connected") } - // QUESTION: The Raft NetTransport does connection pooling, which is useful - // for TCP sockets. The NATS transport simulates a socket using a - // subscription at each endpoint, but everything goes over the same NATS - // socket. This means there is little advantage to pooling here currently. - // Should we actually Dial a new NATS connection here and rely on pooling? - connect := &connectRequestProto{ ID: n.localAddr.String(), Inbox: fmt.Sprintf(natsRequestInbox, n.localAddr.String(), nats.NewInbox()), @@ -250,35 +290,46 @@ func (n *natsStreamLayer) Dial(address raft.ServerAddress, timeout time.Duration panic(err) } - peerConn := n.newNATSConn(string(address)) + // When creating the transport, we pass a 10s timeout, but for Dial, we want + // to use a different timeout, unless the one provided is smaller. + if timeout > n.dfTimeout { + timeout = n.dfTimeout + } - // Setup inbox. - sub, err := n.conn.Subscribe(connect.Inbox, peerConn.msgHandler) + // Make connect request to peer. + msg, err := n.conn.Request(fmt.Sprintf(natsConnectInbox, address), data, timeout) if err != nil { return nil, err } - sub.SetPendingLimits(-1, -1) - if err := n.conn.FlushTimeout(n.dfTimeout); err != nil { - sub.Unsubscribe() + var resp connectResponseProto + if err := json.Unmarshal(msg.Data, &resp); err != nil { return nil, err } - // Make connect request to peer. - msg, err := n.conn.Request(fmt.Sprintf(natsConnectInbox, address), data, n.dfTimeout) + // Success, so now create a new NATS connection... + peerConn, err := n.newNATSConn(string(address)) if err != nil { - sub.Unsubscribe() - return nil, err + return nil, fmt.Errorf("raft-nats: unable to create connection to %q: %v", string(address), err) } - var resp connectResponseProto - if err := json.Unmarshal(msg.Data, &resp); err != nil { - sub.Unsubscribe() + + // Setup inbox. + sub, err := peerConn.conn.SubscribeSync(connect.Inbox) + if err != nil { + peerConn.Close() return nil, err } + sub.SetPendingLimits(-1, -1) peerConn.mu.Lock() peerConn.sub = sub peerConn.outbox = resp.Inbox peerConn.mu.Unlock() + + if err := peerConn.conn.FlushTimeout(timeout); err != nil { + peerConn.Close() + return nil, err + } + n.mu.Lock() n.conns[peerConn] = struct{}{} n.mu.Unlock() @@ -303,17 +354,35 @@ func (n *natsStreamLayer) Accept() (net.Conn, error) { continue } - peerConn := n.newNATSConn(connect.ID) - peerConn.outbox = connect.Inbox + peerConn, err := n.newNATSConn(connect.ID) + if err != nil { + n.logger.Error("Unable to create connection to %q: %v", connect.ID, err) + continue + } // Setup inbox for peer. inbox := fmt.Sprintf(natsRequestInbox, n.localAddr.String(), nats.NewInbox()) - sub, err := n.conn.Subscribe(inbox, peerConn.msgHandler) + sub, err := peerConn.conn.SubscribeSync(inbox) if err != nil { n.logger.Error("Failed to create inbox for remote peer", "error", err) + peerConn.Close() continue } sub.SetPendingLimits(-1, -1) + + peerConn.mu.Lock() + peerConn.outbox = connect.Inbox + peerConn.sub = sub + shouldFlush := !peerConn.streamConn + peerConn.mu.Unlock() + + if shouldFlush { + if err := peerConn.conn.FlushTimeout(n.dfTimeout); err != nil { + peerConn.Close() + continue + } + } + // Reply to peer. resp := &connectResponseProto{Inbox: inbox} data, err := json.Marshal(resp) @@ -322,15 +391,14 @@ func (n *natsStreamLayer) Accept() (net.Conn, error) { } if err := n.conn.Publish(msg.Reply, data); err != nil { n.logger.Error("Failed to send connect response to remote peer", "error", err) - sub.Unsubscribe() + peerConn.Close() continue } if err := n.conn.FlushTimeout(n.dfTimeout); err != nil { n.logger.Error("Failed to flush connect response to remote peer", "error", err) - sub.Unsubscribe() + peerConn.Close() continue } - peerConn.sub = sub n.mu.Lock() n.conns[peerConn] = struct{}{} n.mu.Unlock() @@ -363,7 +431,7 @@ func (n *natsStreamLayer) Addr() net.Addr { // newNATSTransport creates a new raft.NetworkTransport implemented with NATS // as the transport layer. -func newNATSTransport(id string, conn *nats.Conn, timeout time.Duration, logOutput io.Writer) (*raft.NetworkTransport, error) { +func newNATSTransport(id string, conn *nats.Conn, timeout time.Duration, logOutput io.Writer, makeConn natsRaftConnCreator) (*raft.NetworkTransport, error) { if logOutput == nil { logOutput = os.Stderr } @@ -372,38 +440,40 @@ func newNATSTransport(id string, conn *nats.Conn, timeout time.Duration, logOutp Level: hclog.Debug, Output: logOutput, }) - return newNATSTransportWithLogger(id, conn, timeout, logger) + return createNATSTransport(id, conn, timeout, makeConn, logger, nil) } // newNATSTransportWithLogger creates a new raft.NetworkTransport implemented // with NATS as the transport layer using the provided Logger. func newNATSTransportWithLogger(id string, conn *nats.Conn, timeout time.Duration, logger hclog.Logger) (*raft.NetworkTransport, error) { - return createNATSTransport(id, conn, logger, timeout, func(stream raft.StreamLayer) *raft.NetworkTransport { - return raft.NewNetworkTransportWithLogger(stream, 3, timeout, logger) - }) + return createNATSTransport(id, conn, timeout, nil, logger, nil) } // newNATSTransportWithConfig returns a raft.NetworkTransport implemented // with NATS as the transport layer, using the given config struct. func newNATSTransportWithConfig(id string, conn *nats.Conn, config *raft.NetworkTransportConfig) (*raft.NetworkTransport, error) { - if config.Timeout == 0 { - config.Timeout = defaultTPortTimeout - } - return createNATSTransport(id, conn, config.Logger, config.Timeout, func(stream raft.StreamLayer) *raft.NetworkTransport { - config.Stream = stream - return raft.NewNetworkTransportWithConfig(config) - }) + return createNATSTransport(id, conn, 0, nil, nil, config) } -func createNATSTransport(id string, conn *nats.Conn, logger hclog.Logger, timeout time.Duration, - transportCreator func(stream raft.StreamLayer) *raft.NetworkTransport) (*raft.NetworkTransport, error) { +func createNATSTransport(id string, conn *nats.Conn, timeout time.Duration, makeConn natsRaftConnCreator, + logger hclog.Logger, config *raft.NetworkTransportConfig) (*raft.NetworkTransport, error) { - stream, err := newNATSStreamLayer(id, conn, logger, timeout) + if config != nil { + if config.Timeout == 0 { + config.Timeout = defaultTPortTimeout + } + timeout = config.Timeout + logger = config.Logger + } + stream, err := newNATSStreamLayer(id, conn, logger, timeout, makeConn) if err != nil { return nil, err } - - return transportCreator(stream), nil + if config != nil { + config.Stream = stream + return raft.NewNetworkTransportWithConfig(config), nil + } + return raft.NewNetworkTransportWithLogger(stream, 3, timeout, logger), nil } func min(x, y int64) int64 { diff --git a/server/raft_transport_test.go b/server/raft_transport_test.go index a68599e7..2ed701af 100644 --- a/server/raft_transport_test.go +++ b/server/raft_transport_test.go @@ -1,4 +1,4 @@ -// Copyright 2017-2019 The NATS Authors +// Copyright 2017-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -17,6 +17,7 @@ import ( "bytes" "errors" "fmt" + "io" "reflect" "strings" "sync" @@ -645,3 +646,222 @@ func TestRAFTTransportPooledConn(t *testing.T) { t.Fatal(err) } } + +func TestRAFTTransportConnReader(t *testing.T) { + s := runRaftTportServer() + defer s.Shutdown() + + nc1 := newNatsConnection(t) + defer nc1.Close() + stream1, err := newNATSStreamLayer("a", nc1, newTestLogger(t), 2*time.Second, nil) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + defer stream1.Close() + + // Check dial timeout: + start := time.Now() + if _, err := stream1.Dial("b", 50*time.Millisecond); err == nil { + t.Fatal("Expected failure") + } + dur := time.Since(start) + if dur > 250*time.Millisecond { + t.Fatalf("Should have timed out sooner than %v", dur) + } + + ch := make(chan *natsConn, 3) + go func() { + for { + c, err := stream1.Accept() + if err != nil { + ch <- nil + return + } + ch <- c.(*natsConn) + } + }() + + nc2 := newNatsConnection(t) + defer nc2.Close() + stream2, err := newNATSStreamLayer("b", nc2, newTestLogger(t), 2*time.Second, nil) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + defer stream2.Close() + + bToA, err := stream2.Dial("a", time.Second) + if err != nil { + t.Fatalf("Error dialing: %v", err) + } + defer bToA.Close() + + var fromB *natsConn + select { + case fromB = <-ch: + if fromB == nil { + t.Fatal("Error accepting connection") + } + case <-time.After(time.Second): + t.Fatal("Failed to get connection from B") + } + defer fromB.Close() + + if _, err := bToA.Write([]byte("Hello from A!")); err != nil { + t.Fatalf("Error on write: %v", err) + } + + var buf [1024]byte + for _, test := range []struct { + nb int + expected string + }{ + {5, "Hello"}, + {6, " from "}, + {2, "A!"}, + } { + t.Run("", func(t *testing.T) { + n, err := fromB.Read(buf[:test.nb]) + if err != nil || n != test.nb { + t.Fatalf("Unexpected error on read, n=%v err=%v", n, err) + } + if got := string(buf[:n]); got != test.expected { + t.Fatalf("Unexpected result: %q", got) + } + }) + } + + // Write empty message should not go out + if n, err := bToA.Write(nil); err != nil || n != 0 { + t.Fatalf("Write nil should return 0, nil, got %v and %v", n, err) + } + + // Write something else + if _, err := bToA.Write([]byte("msg")); err != nil { + t.Fatalf("Error on write: %v", err) + } + + // Consume all at once + n, err := fromB.Read(buf[:]) + if err != nil || n != 3 { + t.Fatalf("Unexpected error on read, n=%v err=%v", n, err) + } + if got := string(buf[:3]); got != "msg" { + t.Fatalf("Unexpected result: %q", got) + } + + // Now wait for a timeout + fromB.SetDeadline(time.Now().Add(100 * time.Millisecond)) + start = time.Now() + n, err = fromB.Read(buf[:]) + if err == nil || n != 0 { + t.Fatalf("Expected timeout, got err=%v n=%v", err, n) + } + + // Clear timeout + fromB.SetDeadline(time.Time{}) + + // Close the stream1's connection that should send an empty + // message to fromB connection to signal that it is closed. + if err := bToA.Close(); err != nil { + t.Fatalf("Error on close: %v", err) + } + // Call Write on close connection should fail too. + if _, err := bToA.Write([]byte("msg")); err != io.EOF { + t.Fatalf("Expected EOF on write, got: %v", err) + } + + // Expect an EOF error + for i := 0; i < 2; i++ { + n, err = fromB.Read(buf[:]) + if err != io.EOF || n != 0 { + t.Fatalf("Expected EOF, got err=%v n=%v", err, n) + } + } + + // Create a "new" connection. The way the stream was created, + // we actually reuse the connection from the stream, so no + // new connection is created. + tmp, err := stream1.newNATSConn("c") + if err != nil { + t.Fatalf("Error on create: %v", err) + } + // Now close and ensure that we did not close the stream connection. + tmp.Close() + + stream1.mu.Lock() + count := len(stream1.conns) + connected := stream1.conn.IsConnected() + stream1.mu.Unlock() + if count != 0 { + t.Fatalf("Expected stream to have no connection, got %v", count) + } + if !connected { + t.Fatal("Stream connection should not have been closed") + } + + // Now create a stream that will create a new NATS connection. + nc3 := newNatsConnection(t) + defer nc3.Close() + stream3, err := newNATSStreamLayer("c", nc3, newTestLogger(t), 2*time.Second, func(name string) (*nats.Conn, error) { + return nats.Connect(nats.DefaultURL) + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + defer stream3.Close() + + cToA, err := stream3.Dial("a", time.Second) + if err != nil { + t.Fatalf("Error dialing: %v", err) + } + defer cToA.Close() + + var fromC *natsConn + select { + case fromC = <-ch: + if fromC == nil { + t.Fatal("Error accepting connection") + } + case <-time.After(time.Second): + t.Fatal("Failed to get connection from C") + } + defer fromC.Close() + + if _, err := cToA.Write([]byte("from C")); err != nil { + t.Fatalf("Error on write: %v", err) + } + + n, err = fromC.Read(buf[:]) + if err != nil { + t.Fatalf("Error on read: %v", err) + } + if got := string(buf[:n]); got != "from C" { + t.Fatalf("Unexpected read: %q", got) + } + + // Close cToA + if err := cToA.Close(); err != nil { + t.Fatalf("Error on close: %v", err) + } + + // The connection "fromC" should be closed too. + n, err = fromC.Read(buf[:]) + if n != 0 || err != io.EOF { + t.Fatalf("Expected fromC connection to close, got n=%v err=%v", n, err) + } + + // Close all streams. + stream3.Close() + stream2.Close() + stream1.Close() + + // The Accept should return and we should get nil + select { + case c := <-ch: + if c != nil { + t.Fatalf("Should have gotten nil, got %v", c) + } + case <-time.After(time.Second): + t.Fatal("Accept() did not exit") + } +} diff --git a/server/server.go b/server/server.go index 5863e6c9..d2fdb3bb 100644 --- a/server/server.go +++ b/server/server.go @@ -1,4 +1,4 @@ -// Copyright 2016-2020 The NATS Authors +// Copyright 2016-2021 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -760,6 +760,8 @@ type StanServer struct { pingResponseOKBytes []byte pingResponseInvalidClientBytes []byte + // List of server URLs built on startup + serverURLs []string // If using an external server, capture the URL that was given for return in ClientURL(). providedServerURL string } @@ -1497,10 +1499,8 @@ func (s *StanServer) createNatsClientConn(name string) (*nats.Conn, error) { ncOpts.TLSConfig.ServerName, s.opts.TLSServerName) } - ncOpts.Servers, err = s.buildServerURLs() - if err != nil { - return nil, err - } + ncOpts.Servers = s.serverURLs + // From executable, these are provided through the command line `-user ...`, // so they take precedence over streaming's configuration file ncOpts.User = s.natsOpts.Username @@ -1812,6 +1812,12 @@ func RunServerWithOpts(stanOpts *Options, natsOpts *server.Options) (newServer * return nil, err } } + // Build server URLs + urls, err := s.buildServerURLs() + if err != nil { + return nil, err + } + s.serverURLs = urls // Create our connections if err := s.createNatsConnections(); err != nil { return nil, err diff --git a/server/timeout_reader.go b/server/timeout_reader.go deleted file mode 100644 index c62200f5..00000000 --- a/server/timeout_reader.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright 2017-2018 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "bufio" - "errors" - "io" - "runtime" - "time" -) - -const bufferSize = 32768 - -// ErrTimeout reports a read timeout error -var ErrTimeout = errors.New("natslog: read timeout") - -type timeoutReader struct { - b *bufio.Reader - t time.Time - ch <-chan error - closeFunc func() error -} - -func newTimeoutReader(r io.ReadCloser) *timeoutReader { - return &timeoutReader{ - b: bufio.NewReaderSize(r, bufferSize), - closeFunc: func() error { return r.Close() }, - } -} - -// SetDeadline sets the deadline for all future Read calls. -func (r *timeoutReader) SetDeadline(t time.Time) { - r.t = t -} - -func (r *timeoutReader) Read(b []byte) (n int, err error) { - if r.ch == nil { - if r.t.IsZero() || r.b.Buffered() > 0 { - return r.b.Read(b) - } - ch := make(chan error, 1) - r.ch = ch - go func() { - _, err := r.b.Peek(1) - ch <- err - }() - runtime.Gosched() - } - if r.t.IsZero() { - err = <-r.ch // Block - } else { - select { - case err = <-r.ch: // Poll - default: - select { - case err = <-r.ch: // Timeout - case <-time.After(time.Until(r.t)): - return 0, ErrTimeout - } - } - } - r.ch = nil - if r.b.Buffered() > 0 { - n, _ = r.b.Read(b) - } - return -} - -func (r *timeoutReader) Close() error { - return r.closeFunc() -} diff --git a/server/timeout_reader_test.go b/server/timeout_reader_test.go deleted file mode 100644 index 7cb7e144..00000000 --- a/server/timeout_reader_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2017-2018 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package server - -import ( - "io" - "testing" - "time" -) - -func TestTimeoutReader(t *testing.T) { - reader, writer := io.Pipe() - r := newTimeoutReader(reader) - - r.SetDeadline(time.Now().Add(time.Millisecond)) - n, err := r.Read(make([]byte, 10)) - if err != ErrTimeout { - t.Fatal("expected ErrTimeout") - } - if n != 0 { - t.Fatalf("expected: 0\ngot: %d", n) - } - - writer.Write([]byte("hello")) - r.SetDeadline(time.Now().Add(time.Millisecond)) - buf := make([]byte, 5) - n, err = r.Read(buf) - if err != nil { - t.Fatalf("error: %v", err) - } - if n != 5 { - t.Fatalf("expected: 5\ngot: %d", n) - } - if string(buf) != "hello" { - t.Fatalf("expected: hello\ngot: %s", buf) - } - - if err := r.Close(); err != nil { - t.Fatalf("error: %v", err) - } - - n, err = r.Read(make([]byte, 5)) - if err != io.ErrClosedPipe { - t.Fatalf("expected: ErrClosedPipe\ngot: %v", err) - } - if n != 0 { - t.Fatalf("expected: 0\ngot: %d", n) - } -} diff --git a/test/configs/test_parse.conf b/test/configs/test_parse.conf index 0e1ca973..47225b7b 100644 --- a/test/configs/test_parse.conf +++ b/test/configs/test_parse.conf @@ -94,6 +94,7 @@ streaming: { allow_add_remove_node: true bolt_free_list_sync: true bolt_free_list_array: true + nodes_connections: true } sql: {