Skip to content

Commit

Permalink
Websocket: Simplify the connectionMonitor
Browse files Browse the repository at this point in the history
  • Loading branch information
Beadko committed Mar 12, 2024
1 parent a13140f commit ba36511
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 21 deletions.
21 changes: 2 additions & 19 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ var (

// Private websocket errors
var (
errAlreadyRunning = errors.New("connection monitor is already running")
errExchangeConfigIsNil = errors.New("exchange config is nil")
errExchangeConfigEmpty = errors.New("exchange config is empty")
errWebsocketIsNil = errors.New("websocket is nil")
Expand Down Expand Up @@ -292,9 +291,7 @@ func (w *Websocket) Connect() error {
}
w.setState(connected)

if !w.IsConnectionMonitorRunning() {
go w.connectionMonitor()
}
go w.connectionMonitor()

subs, err := w.GenerateSubs() // regenerate state on new connection
if err != nil {
Expand Down Expand Up @@ -374,7 +371,6 @@ func (w *Websocket) dataMonitor() {

// connectionMonitor ensures that the WS keeps connecting
func (w *Websocket) connectionMonitor() {
w.setConnectionMonitorRunning(true)
delay := w.connectionMonitorDelay
timer := time.NewTimer(delay)
for {
Expand All @@ -394,7 +390,7 @@ func (w *Websocket) connectionMonitor() {
log.Debugf(log.WebsocketMgr, "%v websocket: connection monitor exiting", w.exchangeName)
}
timer.Stop()
w.setConnectionMonitorRunning(false)
w.connectionMonitorRunning.Store(false)
return
}
select {
Expand Down Expand Up @@ -628,19 +624,6 @@ func (w *Websocket) IsTrafficMonitorRunning() bool {
return w.trafficMonitorRunning.Load()
}

func (w *Websocket) checkAndSetMonitorRunning() (alreadyRunning bool) {
return !w.connectionMonitorRunning.CompareAndSwap(false, true)
}

func (w *Websocket) setConnectionMonitorRunning(b bool) {
w.connectionMonitorRunning.Store(b)
}

// IsConnectionMonitorRunning returns status of connection monitor
func (w *Websocket) IsConnectionMonitorRunning() bool {
return w.connectionMonitorRunning.Load()
}

func (w *Websocket) setDataMonitorRunning(b bool) {
w.dataMonitorRunning.Store(b)
}
Expand Down
5 changes: 3 additions & 2 deletions exchanges/stream/websocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,13 +571,14 @@ func TestConnectionMonitor(t *testing.T) {
ws.exchangeName = "hello"
ws.Wg = &sync.WaitGroup{}
ws.setEnabled(true)
ws.connectionMonitorRunning.Store(true)
go ws.connectionMonitor()
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.True(c, ws.IsConnectionMonitorRunning(), "IsConnectionMonitorRunning should return true")
assert.True(c, ws.connectionMonitorRunning.Load(), "IsConnectionMonitorRunning should return true")
}, 5*time.Second, 10*time.Millisecond, "ConnectionMonitor must be running")
ws.setEnabled(false)
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.False(c, ws.IsConnectionMonitorRunning(), "IsConnectionMonitorRunning should return false")
assert.False(c, ws.connectionMonitorRunning.Load(), "IsConnectionMonitorRunning should return false")
}, 5*time.Second, 10*time.Millisecond, "ConnectionMonitor must not be running")
}

Expand Down

0 comments on commit ba36511

Please sign in to comment.