Skip to content

Commit

Permalink
Added websocket support
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Apr 23, 2021
1 parent 5bcdd33 commit 995f671
Show file tree
Hide file tree
Showing 3 changed files with 1,732 additions and 8 deletions.
66 changes: 58 additions & 8 deletions nats.go
Expand Up @@ -426,6 +426,10 @@ type Options struct {
// is established, and if a ClosedHandler is set, it will be invoked if
// it fails to connect (after exhausting the MaxReconnect attempts).
RetryOnFailedConnect bool

// For websocket connections, indicates to the server that the connection
// supports compression. If the server does too, then data will be compressed.
Compression bool
}

const (
Expand Down Expand Up @@ -484,6 +488,7 @@ type Conn struct {
pout int
ar bool // abort reconnect
rqch chan struct{}
ws bool // true if a websocket connection

// New style response handler
respSub string // The wildcard subject
Expand Down Expand Up @@ -691,6 +696,8 @@ type MsgHandler func(msg *Msg)
// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
// Comma separated arrays are also supported, e.g. urlA, urlB.
// Options start with the defaults but can be overridden.
// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
func Connect(url string, options ...Option) (*Conn, error) {
opts := GetDefaultOptions()
opts.Servers = processUrlString(url)
Expand Down Expand Up @@ -1085,6 +1092,15 @@ func RetryOnFailedConnect(retry bool) Option {
}
}

// Compression is an Option to indicate if this connection supports
// compression. Currently only supported for Websocket connections.
func Compression(enabled bool) Option {
return func(o *Options) error {
o.Compression = enabled
return nil
}
}

// Handler processing

// SetDisconnectHandler will set the disconnect event handler.
Expand Down Expand Up @@ -1375,6 +1391,12 @@ func (nc *Conn) setupServerPool() error {

// Helper function to return scheme
func (nc *Conn) connScheme() string {
if nc.ws {
if nc.Opts.Secure {
return wsSchemeTLS
}
return wsScheme
}
if nc.Opts.Secure {
return tlsScheme
}
Expand Down Expand Up @@ -1411,6 +1433,16 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
sURL += defaultPortString
}

isWS := isWebsocketScheme(u)
// We don't support mix and match of websocket and non websocket URLs.
// If this is the first URL, then we accept and switch the global state
// to websocket. After that, we will know how to reject mixed URLs.
if len(nc.srvPool) == 0 {
nc.ws = isWS
} else if isWS && !nc.ws || !isWS && nc.ws {
return fmt.Errorf("mixing of websocket and non websocket URLs is not allowed")
}

var tlsName string
if implicit {
curl := nc.current.url
Expand Down Expand Up @@ -1506,11 +1538,14 @@ func (w *natsWriter) writeDirect(strs ...string) error {

func (w *natsWriter) flush() error {
// If a pending buffer is set, we don't flush. Code that needs to
// write directly to the socket, by-passing buffers during (re)connect
// use the writeDirect() API.
if w.pending != nil || len(w.bufs) == 0 {
// write directly to the socket, by-passing buffers during (re)connect,
// will use the writeDirect() API.
if w.pending != nil {
return nil
}
// Do not skip calling w.w.Write() here if len(w.bufs) is 0 because
// the actual writer (if websocket for instance) may have things
// to do such as sending control frames, etc..
_, err := w.w.Write(w.bufs)
w.bufs = w.bufs[:0]
return err
Expand Down Expand Up @@ -1638,6 +1673,11 @@ func (nc *Conn) createConn() (err error) {
return err
}

// If scheme starts with "ws" then branch out to websocket code.
if isWebsocketScheme(u) {
return nc.wsInitHandshake(u)
}

// Reset reader/writer to this new TCP connection
nc.bindToNewConn()
return nil
Expand Down Expand Up @@ -1926,6 +1966,12 @@ func (nc *Conn) processExpectedInfo() error {
return ErrNkeysNotSupported
}

// For websocket connections, we already switched to TLS if need be,
// so we are done here.
if nc.ws {
return nil
}

return nc.checkForSecure()
}

Expand Down Expand Up @@ -4417,6 +4463,12 @@ func (nc *Conn) close(status Status, doCBs bool, err error) {
// all blocking calls, such as Flush() and NextMsg()
func (nc *Conn) Close() {
if nc != nil {
// This will be a no-op if the connection was not websocket.
// We do this here as opposed to inside close() because we want
// to do this only for the final user-driven close of the client.
// Otherwise, we would need to change close() to pass a boolean
// indicating that this is the case.
nc.wsClose()
nc.close(CLOSED, !nc.Opts.NoCallbacksAfterClientClose, nil)
}
}
Expand Down Expand Up @@ -4456,7 +4508,7 @@ func (nc *Conn) drainConnection() {
if nc.isConnecting() || nc.isReconnecting() {
nc.mu.Unlock()
// Move to closed state.
nc.close(CLOSED, true, nil)
nc.Close()
return
}

Expand Down Expand Up @@ -4536,12 +4588,10 @@ func (nc *Conn) drainConnection() {
err := nc.FlushTimeout(5 * time.Second)
if err != nil {
pushErr(err)
nc.close(CLOSED, true, nil)
return
}

// Move to closed state.
nc.close(CLOSED, true, nil)
nc.Close()
}

// Drain will put a connection into a drain state. All subscriptions will
Expand All @@ -4557,7 +4607,7 @@ func (nc *Conn) Drain() error {
}
if nc.isConnecting() || nc.isReconnecting() {
nc.mu.Unlock()
nc.close(CLOSED, true, nil)
nc.Close()
return ErrConnectionReconnecting
}
if nc.isDraining() {
Expand Down

0 comments on commit 995f671

Please sign in to comment.