Skip to content

Commit

Permalink
Merge pull request #1858 from nats-io/ln_ws
Browse files Browse the repository at this point in the history
[ADDED] Websocket for Leaf Node connections
  • Loading branch information
kozlovic committed Jan 29, 2021
2 parents 3c49f08 + 9587bf8 commit 06c63b5
Show file tree
Hide file tree
Showing 10 changed files with 1,508 additions and 651 deletions.
150 changes: 134 additions & 16 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package server
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"regexp"
"runtime"
"strconv"
Expand Down Expand Up @@ -597,7 +599,11 @@ func (c *client) initClient() {
case GATEWAY:
c.ncs.Store(fmt.Sprintf("%s - gid:%d", conn, c.cid))
case LEAF:
c.ncs.Store(fmt.Sprintf("%s - lid:%d", conn, c.cid))
var ws string
if c.isWebsocket() {
ws = "_ws"
}
c.ncs.Store(fmt.Sprintf("%s - lid%s:%d", conn, ws, c.cid))
case SYSTEM:
c.ncs.Store("SYSTEM")
case JETSTREAM:
Expand Down Expand Up @@ -1018,6 +1024,10 @@ func (c *client) readLoop(pre []byte) {
// Last per-account-cache check for closed subscriptions
lpacc := time.Now()
acc := c.acc
var masking bool
if ws {
masking = c.ws.maskread
}
c.mu.Unlock()

defer func() {
Expand All @@ -1041,21 +1051,26 @@ func (c *client) readLoop(pre []byte) {

var wsr *wsReadInfo
if ws {
wsr = &wsReadInfo{}
wsr = &wsReadInfo{mask: masking}
wsr.init()
}

// If we have a pre buffer parse that first.
if len(pre) > 0 {
c.parse(pre)
}

for {
n, err := nc.Read(b)
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
return
var n int
var err error

// If we have a pre buffer parse that first.
if len(pre) > 0 {
b = pre
n = len(pre)
pre = nil
} else {
n, err = nc.Read(b)
// If we have any data we will try to parse and exit at the end.
if n == 0 && err != nil {
c.closeConnection(closedStateForErr(err))
return
}
}
if ws {
bufs, err = c.wsRead(wsr, nc, b[:n])
Expand Down Expand Up @@ -1154,8 +1169,15 @@ func (c *client) readLoop(pre []byte) {
}
// re-snapshot the account since it can change during reload, etc.
acc = c.acc
// Refresh nc because in some cases, we have upgraded c.nc to TLS.
nc = c.nc
c.mu.Unlock()

// Connection was closed
if nc == nil {
return
}

if dur := time.Since(start); dur >= readLoopReportThreshold {
c.Warnf("Readloop processing time: %v", dur)
}
Expand Down Expand Up @@ -1415,12 +1437,12 @@ func (c *client) markConnAsClosed(reason ClosedState) {
if !skipFlush && c.isWebsocket() && !c.ws.closeSent {
c.wsEnqueueCloseMessage(reason)
}
// Be consistent with the creation: for routes and gateways,
// Be consistent with the creation: for routes, gateways and leaf,
// we use Noticef on create, so use that too for delete.
if c.srv != nil {
if c.kind == ROUTER || c.kind == GATEWAY {
if c.kind == ROUTER || c.kind == GATEWAY || c.kind == LEAF {
c.Noticef("%s connection closed: %s", c.typeString(), reason)
} else { // Client, System, Jetstream, Account and Leafnode connections.
} else { // Client, System, Jetstream, and Account connections.
c.Debugf("%s connection closed: %s", c.typeString(), reason)
}
}
Expand Down Expand Up @@ -1501,7 +1523,7 @@ func (c *client) processInfo(arg []byte) error {
case GATEWAY:
c.processGatewayInfo(&info)
case LEAF:
return c.processLeafnodeInfo(&info)
c.processLeafnodeInfo(&info)
}
return nil
}
Expand Down Expand Up @@ -4672,6 +4694,102 @@ func (c *client) getClientInfo(detailed bool) *ClientInfo {
return &ci
}

func (c *client) doTLSServerHandshake(typ string, tlsConfig *tls.Config, timeout float64) error {
_, err := c.doTLSHandshake(typ, false, nil, tlsConfig, _EMPTY_, timeout)
return err
}

func (c *client) doTLSClientHandshake(typ string, url *url.URL, tlsConfig *tls.Config, tlsName string, timeout float64) (bool, error) {
return c.doTLSHandshake(typ, true, url, tlsConfig, tlsName, timeout)
}

// Performs eithe server or client side (if solicit is true) TLS Handshake.
// On error, the TLS handshake error has been logged and the connection
// has been closed.
//
// Lock is held on entry.
func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfig *tls.Config, tlsName string, timeout float64) (bool, error) {
var host string
var resetTLSName bool
var err error

// Capture kind for some debug/error statements.
kind := c.kind

// If we solicited, we will act like the client, otherwise the server.
if solicit {
c.Debugf("Starting TLS %s client handshake", typ)
if tlsConfig.ServerName == _EMPTY_ {
// If the given url is a hostname, use this hostname for the
// ServerName. If it is an IP, use the cfg's tlsName. If none
// is available, resort to current IP.
host = url.Hostname()
if tlsName != _EMPTY_ && net.ParseIP(host) != nil {
host = tlsName
}
tlsConfig.ServerName = host
}
c.nc = tls.Client(c.nc, tlsConfig)
} else {
if kind == CLIENT {
c.Debugf("Starting TLS client connection handshake")
} else {
c.Debugf("Starting TLS %s server handshake", typ)
}
c.nc = tls.Server(c.nc, tlsConfig)
}

conn := c.nc.(*tls.Conn)

// Setup the timeout
ttl := secondsToDuration(timeout)
time.AfterFunc(ttl, func() { tlsTimeout(c, conn) })
conn.SetReadDeadline(time.Now().Add(ttl))

c.mu.Unlock()
if err = conn.Handshake(); err != nil {
if solicit {
// Based on type of error, possibly clear the saved tlsName
// See: https://github.com/nats-io/nats-server/issues/1256
if _, ok := err.(x509.HostnameError); ok {
if host == tlsName {
resetTLSName = true
}
}
}
if kind == CLIENT {
c.Errorf("TLS handshake error: %v", err)
} else {
c.Errorf("TLS %s handshake error: %v", typ, err)
}
c.closeConnection(TLSHandshakeError)

// Grab the lock before returning since the caller was holding the lock on entry
c.mu.Lock()
// Returning any error is fine. Since the connection is closed ErrConnectionClosed
// is appropriate.
return resetTLSName, ErrConnectionClosed
}

// Reset the read deadline
conn.SetReadDeadline(time.Time{})

// Re-Grab lock
c.mu.Lock()

// To be consistent with client, set this flag to indicate that handshake is done
c.flags.set(handshakeComplete)

// The connection still may have been closed on success handshake due
// to a race with tls timeout. If that the case, return error indicating
// that the connection is closed.
if err == nil && c.isClosed() {
err = ErrConnectionClosed
}

return false, err
}

// getRAwAuthUser returns the raw auth user for the client.
// Lock should be held.
func (c *client) getRawAuthUser() string {
Expand Down
81 changes: 20 additions & 61 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"bytes"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"math/rand"
Expand Down Expand Up @@ -733,11 +732,6 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
// Are we creating the gateway based on the configuration
solicit := cfg != nil
var tlsRequired bool
if solicit {
tlsRequired = cfg.TLSConfig != nil
} else {
tlsRequired = opts.Gateway.TLSConfig != nil
}

s.gateway.RLock()
infoJSON := s.gateway.infoJSON
Expand All @@ -749,86 +743,51 @@ func (s *Server) createGateway(cfg *gatewayCfg, url *url.URL, conn net.Conn) {
c.gw = &gateway{}
if solicit {
// This is an outbound gateway connection
cfg.RLock()
tlsRequired = cfg.TLSConfig != nil
cfgName := cfg.Name
cfg.RUnlock()
c.gw.outbound = true
c.gw.name = cfg.Name
c.gw.name = cfgName
c.gw.cfg = cfg
cfg.bumpConnAttempts()
// Since we are delaying the connect until after receiving
// the remote's INFO protocol, save the URL we need to connect to.
c.gw.connectURL = url

c.Noticef("Creating outbound gateway connection to %q", cfg.Name)
c.Noticef("Creating outbound gateway connection to %q", cfgName)
} else {
c.flags.set(expectConnect)
// Inbound gateway connection
c.Noticef("Processing inbound gateway connection")
// Check if TLS is required for inbound GW connections.
tlsRequired = opts.Gateway.TLSConfig != nil
}

// Check for TLS
if tlsRequired {
var host string
var tlsConfig *tls.Config
var tlsName string
var timeout float64
// If we solicited, we will act like the client, otherwise the server.

if solicit {
c.Debugf("Starting TLS gateway client handshake")
cfg.RLock()
tlsName := cfg.tlsName
tlsConfig := cfg.TLSConfig.Clone()
tlsName = cfg.tlsName
tlsConfig = cfg.TLSConfig.Clone()
timeout = cfg.TLSTimeout
cfg.RUnlock()
if tlsConfig.ServerName == "" {
// If the given url is a hostname, use this hostname for the
// ServerName. If it is an IP, use the cfg's tlsName. If none
// is available, resort to current IP.
host = url.Hostname()
if tlsName != "" && net.ParseIP(host) != nil {
host = tlsName
}
tlsConfig.ServerName = host
}
c.nc = tls.Client(c.nc, tlsConfig)
} else {
c.Debugf("Starting TLS gateway server handshake")
c.nc = tls.Server(c.nc, opts.Gateway.TLSConfig)
tlsConfig = opts.Gateway.TLSConfig
timeout = opts.Gateway.TLSTimeout
}

conn := c.nc.(*tls.Conn)

// Setup the timeout
ttl := secondsToDuration(timeout)
time.AfterFunc(ttl, func() { tlsTimeout(c, conn) })
conn.SetReadDeadline(time.Now().Add(ttl))

c.mu.Unlock()
if err := conn.Handshake(); err != nil {
if solicit {
// Based on type of error, possibly clear the saved tlsName
// See: https://github.com/nats-io/nats-server/issues/1256
if _, ok := err.(x509.HostnameError); ok {
cfg.Lock()
if host == cfg.tlsName {
cfg.tlsName = ""
}
cfg.Unlock()
}
// Perform (either server or client side) TLS handshake.
if resetTLSName, err := c.doTLSHandshake("gateway", solicit, url, tlsConfig, tlsName, timeout); err != nil {
if resetTLSName {
cfg.Lock()
cfg.tlsName = _EMPTY_
cfg.Unlock()
}
c.Errorf("TLS gateway handshake error: %v", err)
c.sendErr("Secure Connection - TLS Required")
c.closeConnection(TLSHandshakeError)
return
}
// Reset the read deadline
conn.SetReadDeadline(time.Time{})

// Re-Grab lock
c.mu.Lock()

// To be consistent with client, set this flag to indicate that handshake is done
c.flags.set(handshakeComplete)

// Verify that the connection did not go away while we released the lock.
if c.isClosed() {
c.mu.Unlock()
return
}
Expand Down

0 comments on commit 06c63b5

Please sign in to comment.