Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket: Simplify the connectionMonitor #1502

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
258 changes: 94 additions & 164 deletions exchanges/stream/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ var (

// Private websocket errors
var (
errAlreadyRunning = errors.New("connection monitor is already running")
errExchangeConfigIsNil = errors.New("exchange config is nil")
errExchangeConfigEmpty = errors.New("exchange config is empty")
errWebsocketIsNil = errors.New("websocket is nil")
Expand Down Expand Up @@ -281,8 +280,9 @@ func (w *Websocket) Connect() error {
w.subscriptions = subscriptionMap{}
w.subscriptionMutex.Unlock()

w.dataMonitor()
w.trafficMonitor()
w.Wg.Add(2)
go w.dataMonitor()
go w.trafficMonitor()
w.setState(connecting)

err := w.connector()
Expand All @@ -291,12 +291,8 @@ func (w *Websocket) Connect() error {
return fmt.Errorf("%v Error connecting %w", w.exchangeName, err)
}
w.setState(connected)

if !w.IsConnectionMonitorRunning() {
err = w.connectionMonitor()
if err != nil {
log.Errorf(log.WebsocketMgr, "%s cannot start websocket connection monitor %v", w.GetName(), err)
}
if !w.connectionMonitorRunning.CompareAndSwap(false, true) {
go w.connectionMonitor()
}

subs, err := w.GenerateSubs() // regenerate state on new connection
Expand Down Expand Up @@ -340,99 +336,74 @@ func (w *Websocket) Enable() error {

// dataMonitor monitors job throughput and logs if there is a back log of data
func (w *Websocket) dataMonitor() {
if w.IsDataMonitorRunning() {
return
}
w.setDataMonitorRunning(true)
w.Wg.Add(1)

go func() {
defer func() {
w.setDataMonitorRunning(false)
w.Wg.Done()
}()
dropped := 0
for {
defer w.Wg.Done()
dropped := 0
for {
select {
case <-w.ShutdownC:
return
case d := <-w.DataHandler:
select {
case <-w.ShutdownC:
return
case d := <-w.DataHandler:
select {
case w.ToRoutine <- d:
if dropped != 0 {
log.Infof(log.WebsocketMgr, "%s exchange websocket ToRoutine channel buffer recovered; %d messages were dropped", w.exchangeName, dropped)
dropped = 0
}
default:
if dropped == 0 {
// If this becomes prone to flapping we could drain the buffer, but that's extreme and we'd like to avoid it if possible
log.Warnf(log.WebsocketMgr, "%s exchange websocket ToRoutine channel buffer full; dropping messages", w.exchangeName)
}
dropped++
case w.ToRoutine <- d:
if dropped != 0 {
log.Infof(log.WebsocketMgr, "%s exchange websocket ToRoutine channel buffer recovered; %d messages were dropped", w.exchangeName, dropped)
dropped = 0
}
default:
if dropped == 0 {
// If this becomes prone to flapping we could drain the buffer, but that's extreme and we'd like to avoid it if possible
log.Warnf(log.WebsocketMgr, "%s exchange websocket ToRoutine channel buffer full; dropping messages", w.exchangeName)
}
dropped++
}
}
}()
}
}

// connectionMonitor ensures that the WS keeps connecting
func (w *Websocket) connectionMonitor() error {
if w.checkAndSetMonitorRunning() {
Beadko marked this conversation as resolved.
Show resolved Hide resolved
return errAlreadyRunning
}
delay := w.connectionMonitorDelay

go func() {
timer := time.NewTimer(delay)
for {
func (w *Websocket) connectionMonitor() {
timer := time.NewTimer(w.connectionMonitorDelay)
for {
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: running connection monitor cycle", w.exchangeName)
}
if !w.IsEnabled() {
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: running connection monitor cycle", w.exchangeName)
log.Debugf(log.WebsocketMgr, "%v websocket: connectionMonitor - websocket disabled, shutting down", w.exchangeName)
}
if !w.IsEnabled() {
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: connectionMonitor - websocket disabled, shutting down", w.exchangeName)
if w.IsConnected() {
if err := w.Shutdown(); err != nil {
log.Errorln(log.WebsocketMgr, err)
}
}
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: connection monitor exiting", w.exchangeName)
}
timer.Stop()
w.connectionMonitorRunning.Store(false)
return
}
select {
case err := <-w.ReadMessageErrors:
w.DataHandler <- err
if IsDisconnectionError(err) {
log.Warnf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", w.exchangeName, err)
if w.IsConnected() {
if err := w.Shutdown(); err != nil {
log.Errorln(log.WebsocketMgr, err)
if shutdownErr := w.Shutdown(); shutdownErr != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: connectionMonitor shutdown err: %s", w.exchangeName, shutdownErr)
}
}
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: connection monitor exiting", w.exchangeName)
}
timer.Stop()
w.setConnectionMonitorRunning(false)
return
}
select {
case err := <-w.ReadMessageErrors:
w.DataHandler <- err
if IsDisconnectionError(err) {
log.Warnf(log.WebsocketMgr, "%v websocket has been disconnected. Reason: %v", w.exchangeName, err)
if w.IsConnected() {
if shutdownErr := w.Shutdown(); shutdownErr != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: connectionMonitor shutdown err: %s", w.exchangeName, shutdownErr)
}
}
}
case <-timer.C:
if !w.IsConnecting() && !w.IsConnected() {
err := w.Connect()
if err != nil {
log.Errorln(log.WebsocketMgr, err)
}
}
if !timer.Stop() {
select {
case <-timer.C:
default:
}
case <-timer.C:
if !w.IsConnecting() && !w.IsConnected() {
err := w.Connect()
if err != nil {
log.Errorln(log.WebsocketMgr, err)
}
timer.Reset(delay)
}
timer.Reset(w.connectionMonitorDelay)
}
}()
return nil
}
}

// Shutdown attempts to shut down a websocket connection and associated routines
Expand Down Expand Up @@ -545,59 +516,49 @@ func (w *Websocket) FlushChannels() error {
// 1 slot buffer means that connection will only write to trafficAlert once per trafficCheckInterval to avoid read/write flood in high traffic
// Otherwise we Shutdown the connection after trafficTimeout, unless it's connecting. connectionMonitor is responsible for Connecting again
func (w *Websocket) trafficMonitor() {
if w.IsTrafficMonitorRunning() {
return
}
w.setTrafficMonitorRunning(true)
w.Wg.Add(1)

go func() {
t := time.NewTimer(w.trafficTimeout)
for {
t := time.NewTimer(w.trafficTimeout)
for {
select {
case <-w.ShutdownC:
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown message received", w.exchangeName)
}
t.Stop()
w.Wg.Done()
return
// this case ensures the timer is reset as close to when the traffic is received
case <-time.After(trafficCheckInterval):
shazbert marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-w.ShutdownC:
if w.verbose {
log.Debugf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown message received", w.exchangeName)
}
t.Stop()
w.setTrafficMonitorRunning(false)
w.Wg.Done()
return
case <-time.After(trafficCheckInterval):
select {
case <-w.TrafficAlert:
if !t.Stop() {
<-t.C
}
t.Reset(w.trafficTimeout)
default:
case <-w.TrafficAlert:
if !t.Stop() {
<-t.C
}
case <-t.C:
checkAgain := w.IsConnecting()
select {
case <-w.TrafficAlert:
checkAgain = true
default:
}
if checkAgain {
t.Reset(w.trafficTimeout)
break
}
if w.verbose {
log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout)
}
w.setTrafficMonitorRunning(false) // Cannot defer lest Connect is called after Shutdown but before deferred call
w.Wg.Done() // Without this the w.Shutdown() call below will deadlock
if w.IsConnected() {
err := w.Shutdown()
if err != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err)
}
t.Reset(w.trafficTimeout)
default:
}
case <-t.C:
checkAgain := w.IsConnecting()
select {
case <-w.TrafficAlert:
checkAgain = true
default:
}
if checkAgain {
t.Reset(w.trafficTimeout)
break
}
if w.verbose {
log.Warnf(log.WebsocketMgr, "%v websocket: has not received a traffic alert in %v. Reconnecting", w.exchangeName, w.trafficTimeout)
}
w.Wg.Done() // Without this the w.Shutdown() call below will deadlock
if w.IsConnected() {
if err := w.Shutdown(); err != nil {
log.Errorf(log.WebsocketMgr, "%v websocket: trafficMonitor shutdown err: %s", w.exchangeName, err)
}
return
}
return
}
}()
}
}

func (w *Websocket) setState(s uint32) {
Expand Down Expand Up @@ -628,37 +589,6 @@ func (w *Websocket) IsEnabled() bool {
return w.enabled.Load()
}

func (w *Websocket) setTrafficMonitorRunning(b bool) {
w.trafficMonitorRunning.Store(b)
}

// IsTrafficMonitorRunning returns status of the traffic monitor
func (w *Websocket) IsTrafficMonitorRunning() bool {
return w.trafficMonitorRunning.Load()
}

func (w *Websocket) checkAndSetMonitorRunning() (alreadyRunning bool) {
return !w.connectionMonitorRunning.CompareAndSwap(false, true)
}

func (w *Websocket) setConnectionMonitorRunning(b bool) {
w.connectionMonitorRunning.Store(b)
}

// IsConnectionMonitorRunning returns status of connection monitor
func (w *Websocket) IsConnectionMonitorRunning() bool {
return w.connectionMonitorRunning.Load()
}

func (w *Websocket) setDataMonitorRunning(b bool) {
w.dataMonitorRunning.Store(b)
}

// IsDataMonitorRunning returns status of data monitor
func (w *Websocket) IsDataMonitorRunning() bool {
return w.dataMonitorRunning.Load()
}

// CanUseAuthenticatedWebsocketForWrapper Handles a common check to
// verify whether a wrapper can use an authenticated websocket endpoint
func (w *Websocket) CanUseAuthenticatedWebsocketForWrapper() bool {
Expand Down