Skip to content
This repository has been archived by the owner on May 1, 2020. It is now read-only.

Commit

Permalink
Fix bug
Browse files Browse the repository at this point in the history
Fix a data race caused by unsynchronized access to the socket.
Make close wait for the reader goroutine to die before returning.
  • Loading branch information
romshark committed Mar 31, 2018
1 parent 1a5445e commit 85933a3
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 38 deletions.
49 changes: 36 additions & 13 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@ const supportedProtocolVersion = "1.2"
type Status = int32

const (
// StatDisabled represents a client instance that has been manually closed, thus disabled
StatDisabled Status = 0

// StatDisconnected represents a temporarily disconnected client instance
StatDisconnected Status = 1
StatDisconnected Status = 0

// StatConnected represents a connected client instance
StatConnected Status = 2
StatConnected Status = 1
)

// Autoconnect represents the activation of automatic reconnection
type Autoconnect = int32

const (
// AutoconnectDisabled represents deactivated automatic reconnection
AutoconnectDisabled = 0

// AutoconnectEnabled represents activated automatic reconnection
AutoconnectEnabled = 1
)

// Client represents an instance of one of the servers clients
Expand All @@ -34,7 +42,7 @@ type Client struct {
status Status
defaultReqTimeout time.Duration
reconnInterval time.Duration
autoconnect bool
autoconnect int32
hooks Hooks

sessionLock sync.RWMutex
Expand All @@ -55,8 +63,9 @@ type Client struct {
// connectingLock protects the connecting flag from concurrent access
connectingLock sync.RWMutex

connectLock sync.Mutex
conn webwire.Socket
connectLock sync.Mutex
conn webwire.Socket
readerClosing chan bool

requestManager reqman.RequestManager

Expand All @@ -70,9 +79,9 @@ func NewClient(serverAddress string, opts Options) *Client {
// Prepare configuration
opts.SetDefaults()

autoconnect := true
autoconnect := int32(1)
if opts.Autoconnect == OptDisabled {
autoconnect = false
autoconnect = int32(0)
}

// Initialize new client
Expand All @@ -93,6 +102,7 @@ func NewClient(serverAddress string, opts Options) *Client {
sync.RWMutex{},
sync.Mutex{},
newSocket(nil),
make(chan bool, 1),

reqman.NewRequestManager(),

Expand All @@ -108,7 +118,7 @@ func NewClient(serverAddress string, opts Options) *Client {
),
}

if autoconnect {
if autoconnect == AutoconnectEnabled {
// Asynchronously connect to the server immediately after initialization.
// Call in another goroutine to not block the contructor function caller.
// Set timeout to zero, try indefinitely until connected.
Expand All @@ -129,8 +139,10 @@ func (clt *Client) Status() Status {

// Connect connects the client to the configured server and
// returns an error in case of a connection failure.
// Automatically tries to restore the previous session
// Automatically tries to restore the previous session.
// Enables autoconnect if it was disabled
func (clt *Client) Connect() error {
atomic.StoreInt32(&clt.autoconnect, AutoconnectEnabled)
return clt.connect()
}

Expand Down Expand Up @@ -298,5 +310,16 @@ func (clt *Client) CloseSession() error {
func (clt *Client) Close() {
clt.apiLock.Lock()
defer clt.apiLock.Unlock()
clt.close()

atomic.StoreInt32(&clt.autoconnect, AutoconnectDisabled)

if atomic.LoadInt32(&clt.status) != StatConnected {
return
}
if err := clt.conn.Close(); err != nil {
clt.errorLog.Printf("Failed closing connection: %s", err)
}

// Wait for the reader goroutine to die before returning
<-clt.readerClosing
}
15 changes: 0 additions & 15 deletions client/close.go
Original file line number Diff line number Diff line change
@@ -1,16 +1 @@
package client

import (
"sync/atomic"
)

func (clt *Client) close() {
if atomic.LoadInt32(&clt.status) < StatConnected {
// Either disconnected or disabled
return
}
if err := clt.conn.Close(); err != nil {
clt.errorLog.Printf("Failed closing connection: %s", err)
}
atomic.StoreInt32(&clt.status, StatDisabled)
}
21 changes: 12 additions & 9 deletions client/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ func (clt *Client) connect() error {

// Setup reader thread
go func() {
defer clt.close()
defer func() {
// Set status
atomic.StoreInt32(&clt.status, StatDisconnected)
select {
case clt.readerClosing <- true:
default:
}
}()
for {
message, err := clt.conn.Read()
if err != nil {
Expand All @@ -37,18 +44,14 @@ func (clt *Client) connect() error {
clt.errorLog.Print("Abnormal closure error:", err)
}

// Set status to disconnected if it wasn't disabled
if atomic.LoadInt32(&clt.status) == StatConnected {
atomic.StoreInt32(&clt.status, StatDisconnected)
}

// Call hook
clt.hooks.OnDisconnected()

// Try to reconnect if the client wasn't disabled and autoconnect is on.
// reconnect in another goroutine to let this one die and free up the socket
// Try to reconnect if autoconn wasn't disabled.
// reconnect in another goroutine to let this one die
// and free up the socket
go func() {
if clt.autoconnect && atomic.LoadInt32(&clt.status) != StatDisabled {
if atomic.LoadInt32(&clt.autoconnect) == AutoconnectEnabled {
if err := clt.tryAutoconnect(0); err != nil {
clt.errorLog.Printf("Auto-reconnect failed after connection loss: %s", err)
return
Expand Down
2 changes: 1 addition & 1 deletion client/tryAutoconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func (clt *Client) tryAutoconnect(timeout time.Duration) error {
// will periodically poll the server and check whether it's available again.
// If the autoconnector goroutine has already been spawned then tryAutoconnect will
// just await the connection or timeout respectively
if clt.autoconnect {
if atomic.LoadInt32(&clt.autoconnect) == AutoconnectEnabled {
if atomic.LoadInt32(&clt.status) == StatConnected {
return nil
}
Expand Down
6 changes: 6 additions & 0 deletions test/disabledSessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
wwrclt "github.com/qbeon/webwire-go/client"
)

// TODO: change wrong test case description
// TestDisabledSessions verifies the server is connectable,
// and is able to receives requests and signals, create sessions
// and identify clients during request- and signal handling
Expand Down Expand Up @@ -53,6 +54,11 @@ func TestDisabledSessions(t *testing.T) {
server.Addr().String(),
wwrclt.Options{
DefaultRequestTimeout: 2 * time.Second,
Hooks: wwrclt.Hooks{
OnSessionCreated: func(*wwr.Session) {
t.Errorf("OnSessionCreated was not expected to be called")
},
},
},
)
defer client.Close()
Expand Down

0 comments on commit 85933a3

Please sign in to comment.