Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Websocket for Leaf Node connections #1858

Merged
merged 3 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
150 changes: 134 additions & 16 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package server
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"regexp"
"runtime"
"strconv"
Expand Down Expand Up @@ -597,7 +599,11 @@ func (c *client) initClient() {
case GATEWAY:
c.ncs.Store(fmt.Sprintf("%s - gid:%d", conn, c.cid))
case LEAF:
c.ncs.Store(fmt.Sprintf("%s - lid:%d", conn, c.cid))
var ws string
if c.isWebsocket() {
ws = "_ws"
}
c.ncs.Store(fmt.Sprintf("%s - lid%s:%d", conn, ws, c.cid))
case SYSTEM:
c.ncs.Store("SYSTEM")
case JETSTREAM:
Expand Down Expand Up @@ -1018,6 +1024,10 @@ func (c *client) readLoop(pre []byte) {
// Last per-account-cache check for closed subscriptions
lpacc := time.Now()
acc := c.acc
var masking bool
if ws {
masking = c.ws.maskread
}
c.mu.Unlock()

defer func() {
Expand All @@ -1041,21 +1051,26 @@ func (c *client) readLoop(pre []byte) {

var wsr *wsReadInfo
if ws {
wsr = &wsReadInfo{}
wsr = &wsReadInfo{mask: masking}
wsr.init()
}

// If we have a pre buffer parse that first.
if len(pre) > 0 {
c.parse(pre)
}

for {
n, err := nc.Read(b)
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
return
var n int
var err error

// If we have a pre buffer parse that first.
if len(pre) > 0 {
b = pre
n = len(pre)
pre = nil
} else {
n, err = nc.Read(b)
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
return
}
}
if ws {
bufs, err = c.wsRead(wsr, nc, b[:n])
Expand Down Expand Up @@ -1154,8 +1169,15 @@ func (c *client) readLoop(pre []byte) {
}
// re-snapshot the account since it can change during reload, etc.
acc = c.acc
// Refresh nc because in some cases, we have upgraded c.nc to TLS.
nc = c.nc
c.mu.Unlock()

// Connection was closed
if nc == nil {
return
}

if dur := time.Since(start); dur >= readLoopReportThreshold {
c.Warnf("Readloop processing time: %v", dur)
}
Expand Down Expand Up @@ -1415,12 +1437,12 @@ func (c *client) markConnAsClosed(reason ClosedState) {
if !skipFlush && c.isWebsocket() && !c.ws.closeSent {
c.wsEnqueueCloseMessage(reason)
}
// Be consistent with the creation: for routes and gateways,
// Be consistent with the creation: for routes, gateways and leaf,
// we use Noticef on create, so use that too for delete.
if c.srv != nil {
if c.kind == ROUTER || c.kind == GATEWAY {
if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
c.Noticef("%s connection closed: %s", c.typeString(), reason)
} else { // Client, System, Jetstream, Account and Leafnode connections.
} else { // Client, System, Jetstream, and Account connections.
c.Debugf("%s connection closed: %s", c.typeString(), reason)
}
}
Expand Down Expand Up @@ -1501,7 +1523,7 @@ func (c *client) processInfo(arg []byte) error {
case GATEWAY:
c.processGatewayInfo(&info)
case LEAF:
return c.processLeafnodeInfo(&info)
c.processLeafnodeInfo(&info)
}
return nil
}
Expand Down Expand Up @@ -4672,6 +4694,102 @@ func (c *client) getClientInfo(detailed bool) *ClientInfo {
return &ci
}

func (c *client) doTLSServerHandshake(typ string, tlsConfig *tls.Config, timeout float64) error {
_, err := c.doTLSHandshake(typ, false, nil, tlsConfig, _EMPTY_, timeout)
return err
}

func (c *client) doTLSClientHandshake(typ string, url *url.URL, tlsConfig *tls.Config, tlsName string, timeout float64) (bool, error) {
return c.doTLSHandshake(typ, true, url, tlsConfig, tlsName, timeout)
}

// Performs eithe server or client side (if solicit is true) TLS Handshake.
// On error, the TLS handshake error has been logged and the connection
// has been closed.
//
// Lock is held on entry.
func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfig *tls.Config, tlsName string, timeout float64) (bool, error) {
var host string
var resetTLSName bool
var err error

// Capture kind for some debug/error statements.
kind := c.kind

// If we solicited, we will act like the client, otherwise the server.
if solicit {
c.Debugf("Starting TLS %s client handshake", typ)
if tlsConfig.ServerName == _EMPTY_ {
// If the given url is a hostname, use this hostname for the
// ServerName. If it is an IP, use the cfg's tlsName. If none
// is available, resort to current IP.
host = url.Hostname()
if tlsName != _EMPTY_ && net.ParseIP(host) != nil {
host = tlsName
}
tlsConfig.ServerName = host
}
c.nc = tls.Client(c.nc, tlsConfig)
} else {
if kind == CLIENT {
c.Debugf("Starting TLS client connection handshake")
} else {
c.Debugf("Starting TLS %s server handshake", typ)
}
c.nc = tls.Server(c.nc, tlsConfig)
}

conn := c.nc.(*tls.Conn)

// Setup the timeout
ttl := secondsToDuration(timeout)
time.AfterFunc(ttl, func() { tlsTimeout(c, conn) })
conn.SetReadDeadline(time.Now().Add(ttl))

c.mu.Unlock()
if err = conn.Handshake(); err != nil {
if solicit {
// Based on type of error, possibly clear the saved tlsName
// See: https://github.com/nats-io/nats-server/issues/1256
if _, ok := err.(x509.HostnameError); ok {
if host == tlsName {
resetTLSName = true
}
}
}
if kind == CLIENT {
c.Errorf("TLS handshake error: %v", err)
} else {
c.Errorf("TLS %s handshake error: %v", typ, err)
}
c.closeConnection(TLSHandshakeError)

// Grab the lock before returning since the caller was holding the lock on entry
c.mu.Lock()
// Returning any error is fine. Since the connection is closed ErrConnectionClosed
// is appropriate.
return resetTLSName, ErrConnectionClosed
}

// Reset the read deadline
conn.SetReadDeadline(time.Time{})

// Re-Grab lock
c.mu.Lock()

// To be consistent with client, set this flag to indicate that handshake is done
c.flags.set(handshakeComplete)

// The connection still may have been closed on success handshake due
// to a race with tls timeout. If that the case, return error indicating
// that the connection is closed.
if err == nil && c.isClosed() {
err = ErrConnectionClosed
}

return false, err
}

// getRAwAuthUser returns the raw auth user for the client.
// Lock should be held.
func (c *client) getRawAuthUser() string {
Expand Down
81 changes: 20 additions & 61 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"math/rand"
Expand Down Expand Up @@ -733,11 +732,6 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
// Are we creating the gateway based on the configuration
solicit := cfg != nil
var tlsRequired bool
if solicit {
tlsRequired = cfg.TLSConfig != nil
} else {
tlsRequired = opts.Gateway.TLSConfig != nil
}

s.gateway.RLock()
infoJSON := s.gateway.infoJSON
Expand All @@ -749,86 +743,51 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
c.gw = &gateway{}
if solicit {
// This is an outbound gateway connection
cfg.RLock()
tlsRequired = cfg.TLSConfig != nil
cfgName := cfg.Name
cfg.RUnlock()
c.gw.outbound = true
c.gw.name = cfg.Name
c.gw.name = cfgName
c.gw.cfg = cfg
cfg.bumpConnAttempts()
// Since we are delaying the connect until after receiving
// the remote's INFO protocol, save the URL we need to connect to.
c.gw.connectURL = url

c.Noticef("Creating outbound gateway connection to %q", cfg.Name)
c.Noticef("Creating outbound gateway connection to %q", cfgName)
} else {
c.flags.set(expectConnect)
// Inbound gateway connection
c.Noticef("Processing inbound gateway connection")
// Check if TLS is required for inbound GW connections.
tlsRequired = opts.Gateway.TLSConfig != nil
}

// Check for TLS
if tlsRequired {
var host string
var tlsConfig *tls.Config
var tlsName string
var timeout float64
// If we solicited, we will act like the client, otherwise the server.

if solicit {
c.Debugf("Starting TLS gateway client handshake")
cfg.RLock()
tlsName := cfg.tlsName
tlsConfig := cfg.TLSConfig.Clone()
tlsName = cfg.tlsName
tlsConfig = cfg.TLSConfig.Clone()
timeout = cfg.TLSTimeout
cfg.RUnlock()
if tlsConfig.ServerName == "" {
// If the given url is a hostname, use this hostname for the
// ServerName. If it is an IP, use the cfg's tlsName. If none
// is available, resort to current IP.
host = url.Hostname()
if tlsName != "" && net.ParseIP(host) != nil {
host = tlsName
}
tlsConfig.ServerName = host
}
c.nc = tls.Client(c.nc, tlsConfig)
} else {
c.Debugf("Starting TLS gateway server handshake")
c.nc = tls.Server(c.nc, opts.Gateway.TLSConfig)
tlsConfig = opts.Gateway.TLSConfig
timeout = opts.Gateway.TLSTimeout
}

conn := c.nc.(*tls.Conn)

// Setup the timeout
ttl := secondsToDuration(timeout)
time.AfterFunc(ttl, func() { tlsTimeout(c, conn) })
conn.SetReadDeadline(time.Now().Add(ttl))

c.mu.Unlock()
if err := conn.Handshake(); err != nil {
if solicit {
// Based on type of error, possibly clear the saved tlsName
// See: https://github.com/nats-io/nats-server/issues/1256
if _, ok := err.(x509.HostnameError); ok {
cfg.Lock()
if host == cfg.tlsName {
cfg.tlsName = ""
}
cfg.Unlock()
}
// Perform (either server or client side) TLS handshake.
if resetTLSName, err := c.doTLSHandshake("gateway", solicit, url, tlsConfig, tlsName, timeout); err != nil {
if resetTLSName {
cfg.Lock()
cfg.tlsName = _EMPTY_
cfg.Unlock()
}
c.Errorf("TLS gateway handshake error: %v", err)
c.sendErr("Secure Connection - TLS Required")
c.closeConnection(TLSHandshakeError)
return
}
// Reset the read deadline
conn.SetReadDeadline(time.Time{})

// Re-Grab lock
c.mu.Lock()

// To be consistent with client, set this flag to indicate that handshake is done
c.flags.set(handshakeComplete)

// Verify that the connection did not go away while we released the lock.
if c.isClosed() {
c.mu.Unlock()
return
}
Expand Down