Skip to content
This repository has been archived by the owner on May 1, 2020. It is now read-only.

Commit

Permalink
Implement background autoconnect
Browse files Browse the repository at this point in the history
Implement the background autoconnect feature which automatically
maintains the connection and tries to reconnect whenever the
connection is lost when autoconnect is enabled.
  • Loading branch information
romshark committed Mar 23, 2018
1 parent 29e0688 commit a7b0974
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 141 deletions.
1 change: 1 addition & 0 deletions client.go
Expand Up @@ -64,6 +64,7 @@ func (clt *Client) unlink() {

clt.connected = false
clt.Session = nil
clt.conn.Close()

clt.sessionLock.Unlock()
clt.connLock.Unlock()
Expand Down
60 changes: 50 additions & 10 deletions client/client.go
Expand Up @@ -16,10 +16,23 @@ import (

const supportedProtocolVersion = "1.2"

type ClientStatus = int32

const (
// StatDisabled represents a client instance that has been manually closed, thus disabled
StatDisabled ClientStatus = 0

// StatDisconnected represents a temporarily disconnected client instance
StatDisconnected ClientStatus = 1

// StatConnected represents a connected client instance
StatConnected ClientStatus = 2
)

// Client represents an instance of one of the servers clients
type Client struct {
serverAddr string
isConnected int32
status ClientStatus
defaultReqTimeout time.Duration
reconnInterval time.Duration
autoconnect bool
Expand All @@ -33,7 +46,16 @@ type Client struct {
// because performing multiple requests and/or signals simultaneously is fine.
// The Connect, RestoreSession, CloseSession and Close methods are locked exclusively
// because they should temporarily block any other interaction with this client instance.
apiLock sync.RWMutex
apiLock sync.RWMutex

// connectingLock protects the connecting channel from direct concurrent mutations
connectingLock sync.RWMutex
// connecting is a channel that is nil when the client is connected and is only initialized
// when the client loses connection and needs to spawn an autoconnector goroutine.
// It prevents multiple autoconnection attempts from spawning superfluous multiple goroutines
// each polling the server
connecting chan error

connectLock sync.Mutex
connLock sync.Mutex
conn *websocket.Conn
Expand All @@ -47,16 +69,18 @@ type Client struct {

// NewClient creates a new client instance.
func NewClient(serverAddress string, opts Options) *Client {
// Prepare configuration
opts.SetDefaults()

autoconnect := true
if opts.Autoconnect == OptDisabled {
autoconnect = false
}

return &Client{
// Initialize new client
newClt := &Client{
serverAddress,
0,
StatDisconnected,
opts.DefaultRequestTimeout,
opts.ReconnectionInterval,
autoconnect,
Expand All @@ -66,6 +90,8 @@ func NewClient(serverAddress string, opts Options) *Client {
nil,

sync.RWMutex{},
sync.RWMutex{},
nil,
sync.Mutex{},
sync.Mutex{},
nil,
Expand All @@ -83,11 +109,24 @@ func NewClient(serverAddress string, opts Options) *Client {
log.Ldate|log.Ltime|log.Lshortfile,
),
}

if autoconnect {
// Asynchronously connect to the server immediately after initialization.
// Call in another goroutine to not block the contructor function caller.
// Set timeout to zero, try indefinitely until connected.
go newClt.tryAutoconnect(0)
}

return newClt
}

// IsConnected returns true if the client is connected to the server, otherwise false is returned
func (clt *Client) IsConnected() bool {
return atomic.LoadInt32(&clt.isConnected) > 0
// Status returns the current client status
// which is either disabled, disconnected or connected.
// The client is considered disabled when it was manually closed through client.Close,
// while disconnected is considered a temporary connection loss.
// A disabled client won't autoconnect until enabled again.
func (clt *Client) Status() ClientStatus {
return atomic.LoadInt32(&clt.status)
}

// Connect connects the client to the configured server and
Expand Down Expand Up @@ -203,6 +242,7 @@ func (clt *Client) PendingRequests() int {
// RestoreSession tries to restore the previously opened session.
// Fails if a session is currently already active
func (clt *Client) RestoreSession(sessionKey []byte) error {
// TODO: restore session is a mutating method and should acquire an exclusive lock
clt.apiLock.RLock()
defer clt.apiLock.RUnlock()

Expand Down Expand Up @@ -244,7 +284,7 @@ func (clt *Client) CloseSession() error {
clt.sessionLock.RUnlock()

// Synchronize session closure to the server if connected
if atomic.LoadInt32(&clt.isConnected) > 0 {
if atomic.LoadInt32(&clt.status) == StatConnected {
if _, err := clt.sendNamelessRequest(
webwire.MsgCloseSession,
webwire.Payload{},
Expand All @@ -262,8 +302,8 @@ func (clt *Client) CloseSession() error {
return nil
}

// Close gracefully closes the connection.
// Does nothing if the client isn't connected
// Close gracefully closes the connection and disables the client.
// A disabled client won't autoconnect until enabled again.
func (clt *Client) Close() {
clt.apiLock.Lock()
defer clt.apiLock.Unlock()
Expand Down
9 changes: 6 additions & 3 deletions client/close.go
@@ -1,15 +1,18 @@
package client

import "sync/atomic"
import (
"sync/atomic"
)

func (clt *Client) close() {
clt.connLock.Lock()
defer clt.connLock.Unlock()
if atomic.LoadInt32(&clt.isConnected) < 1 {
if atomic.LoadInt32(&clt.status) < StatConnected {
// Either disconnected or disabled
return
}
if err := clt.conn.Close(); err != nil {
clt.errorLog.Printf("Failed closing connection: %s", err)
}
atomic.StoreInt32(&clt.isConnected, 0)
atomic.StoreInt32(&clt.status, StatDisabled)
}
40 changes: 34 additions & 6 deletions client/connect.go
Expand Up @@ -9,10 +9,17 @@ import (
webwire "github.com/qbeon/webwire-go"
)

// connect will try to establish a connection to the configured webwire server
// and try to automatically restore the session if there is any.
// If the session restoration fails connect won't fail, instead it will reset the current session
// and return normally.
// Before establishing the connection - connect verifies protocol compatibility and returns an
// error if the protocol implemented by the server doesn't match the required protocol version
// of this client instance.
func (clt *Client) connect() (err error) {
clt.connectLock.Lock()
defer clt.connectLock.Unlock()
if atomic.LoadInt32(&clt.isConnected) > 0 {
if atomic.LoadInt32(&clt.status) == StatConnected {
return nil
}

Expand All @@ -23,6 +30,11 @@ func (clt *Client) connect() (err error) {
connURL := url.URL{Scheme: "ws", Host: clt.serverAddr, Path: "/"}

clt.connLock.Lock()
if clt.conn != nil {
if err := clt.conn.Close(); err != nil {
panic(err)
}
}
clt.conn, _, err = websocket.DefaultDialer.Dial(connURL.String(), nil)
if err != nil {
return webwire.NewDisconnectedErr(fmt.Errorf("Dial failure: %s", err))
Expand All @@ -42,11 +54,27 @@ func (clt *Client) connect() (err error) {
) {
// Error while reading message
clt.errorLog.Print("Failed reading message:", err)
break
} else {
// Shutdown client due to clean disconnection
break
}

// Set status to disconnected if it wasn't disabled
if atomic.LoadInt32(&clt.status) == StatConnected {
atomic.StoreInt32(&clt.status, StatDisconnected)
}

// Call hook
clt.hooks.OnDisconnected()

// Try to reconnect if the client wasn't disabled and autoconnect is on.
// reconnect in another goroutine to let this one die and free up the socket
go func() {
if clt.autoconnect && atomic.LoadInt32(&clt.status) != StatDisabled {
if err := clt.tryAutoconnect(0); err != nil {
clt.errorLog.Printf("Auto-reconnect failed after connection loss: %s", err)
return
}
}
}()
return
}
// Try to handle the message
if err = clt.handleMessage(message); err != nil {
Expand All @@ -55,7 +83,7 @@ func (clt *Client) connect() (err error) {
}
}()

atomic.StoreInt32(&clt.isConnected, 1)
atomic.StoreInt32(&clt.status, StatConnected)

// Read the current sessions key if there is any
clt.sessionLock.RLock()
Expand Down
8 changes: 8 additions & 0 deletions client/hooks.go
Expand Up @@ -4,6 +4,10 @@ import webwire "github.com/qbeon/webwire-go"

// Hooks represents all callback hook functions
type Hooks struct {
// OnDisconnected is an optional callback.
// It's invoked when the client is disconnected from the server for any reason.
OnDisconnected func()

// OnServerSignal is an optional callback.
// It's invoked when the webwire client receives a signal from the server
OnServerSignal func(payload webwire.Payload)
Expand All @@ -20,6 +24,10 @@ type Hooks struct {

// SetDefaults sets undefined required hooks
func (hooks *Hooks) SetDefaults() {
if hooks.OnDisconnected == nil {
hooks.OnDisconnected = func() {}
}

if hooks.OnServerSignal == nil {
hooks.OnServerSignal = func(_ webwire.Payload) {}
}
Expand Down
80 changes: 46 additions & 34 deletions client/tryAutoconnect.go
Expand Up @@ -8,46 +8,58 @@ import (
)

func (clt *Client) tryAutoconnect(timeout time.Duration) error {
if atomic.LoadInt32(&clt.isConnected) > 0 {
return nil
}

// If autoconnect is enabled the client will spawn a new autoconnector goroutine which
// will periodically poll the server and check whether it's available again.
// If the autoconnector goroutine has already been spawned then tryAutoconnect will
// just await the connection or timeout respectively
if clt.autoconnect {
stopTrying := make(chan error, 1)
connected := make(chan error, 1)
go func() {
for {
select {
case <-stopTrying:
return
default:
}
if atomic.LoadInt32(&clt.status) == StatConnected {
return nil
}

err := clt.connect()
switch err := err.(type) {
case nil:
close(connected)
return
case webwire.DisconnectedErr:
time.Sleep(clt.reconnInterval)
default:
// Unexpected error
connected <- err
return
clt.connectingLock.RLock()
// Start the reconnector goroutine if not already started.
// If it's already started then just proceed to wait until either connected or timed out
if clt.connecting == nil {
clt.connecting = make(chan error, 1)
go func() {
for {
err := clt.connect()
switch err := err.(type) {
case nil:
clt.connectingLock.Lock()
close(clt.connecting)
clt.connecting = nil
clt.connectingLock.Unlock()
return
case webwire.DisconnectedErr:
time.Sleep(clt.reconnInterval)
default:
// Unexpected error
clt.connecting <- err
return
}
}
}
}()
}()
}
clt.connectingLock.RUnlock()

// TODO: implement autoconnect
select {
case err := <-connected:
return err
case <-time.After(timeout):
// Stop reconnection trial loop and return timeout error
close(stopTrying)
return webwire.ReqTimeoutErr{}
if timeout > 0 {
select {
case err := <-clt.connecting:
return err
case <-time.After(timeout):
// TODO: set missing timeout target
return webwire.ReqTimeoutErr{}
}
} else {
// Try indefinitely
return <-clt.connecting
}
} else {
if atomic.LoadInt32(&clt.status) == StatConnected {
return nil
}
if err := clt.connect(); err != nil {
return err
}
Expand Down
13 changes: 3 additions & 10 deletions examples/chatroom/client/main.go
Expand Up @@ -2,16 +2,14 @@ package main

import (
"flag"
"fmt"
"log"
"time"

webwireClient "github.com/qbeon/webwire-go/client"
)

var client *webwireClient.Client

var serverAddr = flag.String("addr", ":8081", "server address")
var serverAddr = flag.String("addr", ":9090", "server address")
var password = flag.String("pass", "", "password")
var username = flag.String("name", "", "username")

Expand All @@ -29,17 +27,12 @@ func main() {
OnSessionCreated: onSessionCreated,
},
// Default timeout for timed requests
DefaultRequestTimeout: 5 * time.Second,
DefaultRequestTimeout: 10 * time.Second,
ReconnectionInterval: 2 * time.Second,
},
)
defer client.Close()

// Connect to the server
if err := client.Connect(); err != nil {
panic(fmt.Errorf("Couldn't connect to the server: %s", err))
}
log.Printf("Connected to server %s", *serverAddr)

// Authenticate
if *username != "" && *password != "" {
authenticate()
Expand Down
3 changes: 2 additions & 1 deletion examples/chatroom/client/mainLoop.go
Expand Up @@ -61,7 +61,8 @@ MAINLOOP:
}
fmt.Println("Logged out, you're anonymous now")
default:
if err := client.Signal("", webwire.Payload{
// Send the message and await server reply for the message to be considered posted
if _, err := client.Request("msg", webwire.Payload{
Data: []byte(input),
}); err != nil {
log.Printf("WARNING: Couldn't send message: %s", err)
Expand Down

0 comments on commit a7b0974

Please sign in to comment.