Skip to content

Commit

Permalink
Merge pull request #41 from kacf/1.0.x
Browse files Browse the repository at this point in the history
MEN-4694 Break reconnection loop on auth lost
  • Loading branch information
kacf committed Jul 7, 2021
2 parents 20b639a + bda447b commit bf966d2
Showing 1 changed file with 51 additions and 1 deletion.
52 changes: 51 additions & 1 deletion connectionmanager/connectionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ var (
ErrConnectionRetriesExhausted = errors.New("failed to connect after max number of retries")
)

var (
reconnectingMutex = sync.Mutex{}
reconnecting = map[ws.ProtoType]bool{}
cancelReconnectChan = map[ws.ProtoType]chan bool{}
)

type ProtocolHandler struct {
proto ws.ProtoType
connection *connection.Connection
Expand Down Expand Up @@ -77,9 +83,14 @@ func connect(proto ws.ProtoType, serverUrl, connectUrl, token string, skipVerify
scheme := getWebSocketScheme(parsedUrl.Scheme)
u := url.URL{Scheme: scheme, Host: parsedUrl.Host, Path: connectUrl}

cancelReconnectChan[proto] = make(chan bool)
var c *connection.Connection
var i uint = 0
for {
defer func() {
setReconnecting(proto, false)
}()
setReconnecting(proto, true)
for IsReconnecting(proto) {
i++
c, err = connection.NewConnection(u, token, writeWait, maxMessageSize, defaultPingWait, skipVerify, serverCertificate)
if err != nil || c == nil {
Expand All @@ -91,6 +102,11 @@ func connect(proto ws.ProtoType, serverUrl, connectUrl, token string, skipVerify
"reconnecting in %ds (try %d/%d); len(token)=%d", serverUrl, connectUrl,
err.Error(), reconnectIntervalSeconds, i, retries, len(token))
select {
case cancelFlag := <-cancelReconnectChan[proto]:
log.Tracef("connectionmanager connect got cancelFlag=%+v", cancelFlag)
if cancelFlag {
return nil
}
case <-ctx.Done():
return nil
case <-time.After(time.Second * time.Duration(reconnectIntervalSeconds)):
Expand All @@ -111,6 +127,8 @@ func connect(proto ws.ProtoType, serverUrl, connectUrl, token string, skipVerify
connection: c,
mutex: &sync.Mutex{},
}

log.Tracef("connectionmanager connect returning")
return nil
}

Expand Down Expand Up @@ -166,7 +184,39 @@ func Write(proto ws.ProtoType, m *ws.ProtoMsg) error {
return h.connection.WriteMessage(m)
}

func IsReconnecting(proto ws.ProtoType) bool {
reconnectingMutex.Lock()
defer reconnectingMutex.Unlock()
return reconnecting[proto]
}

func setReconnecting(proto ws.ProtoType, v bool) bool {
reconnectingMutex.Lock()
defer reconnectingMutex.Unlock()
reconnecting[proto] = v
return v
}

func CancelReconnection(proto ws.ProtoType) {
maxWaitSeconds := 8
cancelReconnectChan[proto] <- true
for maxWaitSeconds > 0 {
time.Sleep(time.Second)
if !IsReconnecting(proto) {
break
}
maxWaitSeconds--
}
if IsReconnecting(proto) {
log.Error("failed to cancel reconnection")
}
}

func Close(proto ws.ProtoType) error {
if IsReconnecting(proto) {
CancelReconnection(proto)
}

handlersByTypeMutex.Lock()
defer handlersByTypeMutex.Unlock()

Expand Down

0 comments on commit bf966d2

Please sign in to comment.