Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure we use correct MSG prefix #978

Merged
merged 2 commits into from May 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/client.go
Expand Up @@ -2567,6 +2567,8 @@ sendToRoutesOrLeafs:
kind := rt.sub.client.kind
mh := c.msgb[:msgHeadProtoLen]
if kind == ROUTER {
// Router (and Gateway) nodes are RMSG. Set here since leafnodes may rewrite.
mh[0] = 'R'
kozlovic marked this conversation as resolved.
Show resolved Hide resolved
mh = append(mh, acc.Name...)
mh = append(mh, ' ')
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/const.go
Expand Up @@ -40,7 +40,7 @@ var (

const (
// VERSION is the current version for the server.
VERSION = "2.0.0-RC8"
VERSION = "2.0.0-RC9"

// PROTO is the currently supported protocol.
// 0 was the original
Expand Down
3 changes: 3 additions & 0 deletions server/gateway.go
Expand Up @@ -2139,6 +2139,9 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
// Get a subscription from the pool
sub := subPool.Get().(*subscription)

// Make sure we are an 'R' proto
c.msgb[0] = 'R'

// Check if the subject is on "$GR.<cluster hash>.",
// and if so, send to that GW regardless of its
// interest on the real subject (that is, skip the
Expand Down
6 changes: 6 additions & 0 deletions server/leafnode.go
Expand Up @@ -503,6 +503,12 @@ func (s *Server) createLeafNode(conn net.Conn, remote *leafNodeCfg) *client {

url := c.leaf.remote.getCurrentURL()
host, _, _ := net.SplitHostPort(url.Host)
// We need to check if this host is an IP. If so, we probably
// had this advertised to us an should use the configured host
// name for the TLS server name.
if net.ParseIP(host) != nil {
host, _, _ = net.SplitHostPort(c.leaf.remote.RemoteLeafOpts.URL.Host)
}
tlsConfig.ServerName = host

c.nc = tls.Client(c.nc, tlsConfig)
Expand Down
68 changes: 68 additions & 0 deletions test/leafnode_test.go
Expand Up @@ -1831,3 +1831,71 @@ func TestLeafNodeSwitchGatewayToInterestModeOnly(t *testing.T) {
leafSend("PING\r\n")
leafExpect(pongRe)
}

// The MSG proto for routes and gateways is RMSG, and we have an
// optimization that a scratch buffer has RMSG and when doing a
// client we just start at scratch[1]. For leaf nodes its LMSG and we
// rewrite scratch[0], but never reset it which causes protocol
// errors when used with routes or gateways after use to send
// to a leafnode.
// We will create a server with a leafnode connection and a route
// and a gateway connection.

// route connections to simulate.
func TestLeafNodeResetsMSGProto(t *testing.T) {
opts := testDefaultOptionsForLeafNodes()
opts.Cluster.Host = opts.Host
opts.Cluster.Port = -1
opts.Gateway.Name = "lproto"
opts.Gateway.Host = opts.Host
opts.Gateway.Port = -1
opts.Accounts = []*server.Account{server.NewAccount("$SYS")}
opts.SystemAccount = "$SYS"

s := RunServer(opts)
defer s.Shutdown()

lc := createLeafConn(t, opts.LeafNode.Host, opts.LeafNode.Port)
defer lc.Close()

leafSend, leafExpect := setupConn(t, lc)

gw := createGatewayConn(t, opts.Gateway.Host, opts.Gateway.Port)
defer gw.Close()

gwSend, gwExpect := setupGatewayConn(t, gw, "A", "lproto")
gwSend("PING\r\n")
gwExpect(pongRe)

// Now setup interest in the leaf node for 'foo'.
leafSend("LS+ foo\r\nPING\r\n")
leafExpect(pongRe)

// Send msg from the gateway.
gwSend("RMSG $G foo 2\r\nok\r\nPING\r\n")
gwExpect(pongRe)

leafExpect(lmsgRe)

// At this point the gw inside our main server's scratch buffer is LMSG. When we do
// same with a connected route with interest it should fail.
rc := createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port)
defer rc.Close()
checkInfoMsg(t, rc)
routeSend, routeExpect := setupRouteEx(t, rc, opts, "RC")

routeSend("RS+ $G foo\r\nPING\r\n")
routeExpect(pongRe)

// This is for the route interest we just created.
leafExpect(lsubRe)

// Send msg from the gateway.
gwSend("RMSG $G foo 2\r\nok\r\nPING\r\n")
gwExpect(pongRe)

leafExpect(lmsgRe)

// Now make sure we get it on route. This will fail with the proto bug.
routeExpect(rmsgRe)
}