Skip to content

Commit

Permalink
Merge pull request #53 from lluiscampos/1.0.x-MEN-5290-fix-reconnect
Browse files Browse the repository at this point in the history
1.0.x: MEN-5290: Several robustness fixes around re-connection
  • Loading branch information
lluiscampos committed Jan 3, 2022
2 parents bf966d2 + ea30e2a commit 0afe7b7
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 472 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2021 Northern.tech AS
Copyright 2022 Northern.tech AS

All content in this project is licensed under the Apache License v2, unless
indicated otherwise.
Expand Down
2 changes: 1 addition & 1 deletion LIC_FILES_CHKSUM.sha256
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#
# Apache-2.0 license.
b4acfcfa2a0ba1a8c82ec3965fbcee886cff8394ca4214e0ddac0a36beb1e05a LICENSE
1033348db7606a7e61b6484f293847cf8d7a35766efebb97e304d4bd5d7f3f6b LICENSE
cfc7749b96f63bd31c3c42b5c471bf756814053e847c10f3eb003417bc523d30 vendor/google.golang.org/appengine/LICENSE
d18f6323b71b0b768bb5e9616e36da390fbd39369a81807cca352de4e4e6aa0b vendor/gopkg.in/yaml.v3/LICENSE
3eb823230e5d112e1bd032ccc82ae765cf676d0d6d46a1a1daa2d658b3005b67 vendor/github.com/mendersoftware/go-lib-micro/LICENSE
Expand Down
178 changes: 98 additions & 80 deletions app/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"os/user"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -44,6 +45,7 @@ const (
EventReconnect = "reconnect"
EventReconnectRequest = "reconnect-req"
EventConnectionEstablished = "connected"
EventConnectionError = "connected-error"
)

const (
Expand Down Expand Up @@ -71,6 +73,7 @@ type MenderShellDaemon struct {
username string
shell string
shellArguments []string
serverJwt string
serverUrl string
serverCertificate string
skipVerify bool
Expand Down Expand Up @@ -178,26 +181,56 @@ func (d *MenderShellDaemon) outputStatus() {
}

func (d *MenderShellDaemon) messageLoop() (err error) {
log.Debug("messageLoop: starting")
log.Trace("messageLoop: starting")
sendConnectReq := true
waitConnectResp := true
for {
if d.shouldStop() {
log.Debug("messageLoop: returning")
break
}

log.Debug("messageLoop: calling readMessage")
message, err := d.readMessage()
log.Debugf("messageLoop: called readMessage: %v,%v", message, err)
if err != nil {
log.Errorf("messageLoop: error on readMessage: %v; disconnecting, waiting for reconnect.", err)
connectionmanager.Close(ws.ProtoTypeShell)
if sendConnectReq {
e := MenderShellDaemonEvent{
event: EventReconnectRequest,
}
log.Debugf("messageLoop: posting event: %s; waiting for response", e.event)
d.reconnectChan <- e
sendConnectReq = false
}

if waitConnectResp {
response := <-d.connectionEstChan
log.Debugf("messageLoop: got response: %+v", response)
log.Tracef("messageLoop: got response: %+v", response)
if response.event == EventConnectionEstablished {
waitConnectResp = false
} else {
// The re-connection failed, retry
sendConnectReq = true
waitConnectResp = true
continue
}
}

log.Trace("messageLoop: calling readMessage")
message, err := d.readMessage()
log.Tracef("messageLoop: called readMessage: %v,%v", message, err)
if err != nil {
log.Errorf(
"messageLoop: error on readMessage: %v; disconnecting, waiting for reconnect.",
err,
)
// nolint:lll
// If we used a closed connection means that it has been closed from other goroutine
// and a reconnection is ongoing (or done). Just wait for the event and continue
// This can happen when dbusEventLoop detects a change in ServerURL and/or JWT token.
// It should be safe to use this string, see:
// https://github.com/golang/go/blob/529939072eef730c82333344f321972874758be8/src/net/error_test.go#L502-L507
if !strings.Contains(err.Error(), "use of closed network connection") {
connectionmanager.Close(ws.ProtoTypeShell)
sendConnectReq = true
}
waitConnectResp = true
continue
}

Expand All @@ -212,43 +245,16 @@ func (d *MenderShellDaemon) messageLoop() (err error) {
return err
}

func (d *MenderShellDaemon) waitForJWTToken(client mender.AuthClient) (string, error) {
tokenStateChange := client.GetJwtTokenStateChangeChannel()
for {
select {
case p := <-tokenStateChange:
if len(p) > 1 && p[1].ParamType == dbus.GDBusTypeString && len(p[1].ParamData.(string)) > 0 {
d.serverUrl = p[1].ParamData.(string)
}
if len(p) > 0 && p[0].ParamType == dbus.GDBusTypeString && len(p[0].ParamData.(string)) > 0 {
return p[0].ParamData.(string), nil
}
case <-d.ctx.Done():
return "", errors.New("unable to get the JWT token")
}
}
}

func (d *MenderShellDaemon) gotAuthToken(p []dbus.SignalParams, needsReconnect bool) string {
jwtToken := p[0].ParamData.(string)
func (d *MenderShellDaemon) processJwtTokenStateChange(jwtToken, serverUrl string) {
jwtTokenLength := len(jwtToken)
if jwtTokenLength > 0 {
if jwtTokenLength > 0 && len(serverUrl) > 0 {
if !d.authorized {
log.Debugf("dbusEventLoop: StateChanged from unauthorized"+
" to authorized, len(token)=%d", jwtTokenLength)
//in hereT technically it is possible we close a closed connection
log.Tracef("dbusEventLoop: StateChanged from unauthorized"+
" to authorized, len(token)=%d, ServerURL=%q", jwtTokenLength, serverUrl)
//in here it is technically possible that we close a closed connection
//but it is not a critical error, the important thing is not to leave
//messageLoop waiting forever on readMessage
connectionmanager.Close(ws.ProtoTypeShell)
if needsReconnect {
e := MenderShellDaemonEvent{
event: EventReconnect,
data: jwtToken,
id: "(gotAuthToken)",
}
log.Debugf("(gotAuthToken) posting Event: %s", e.event)
d.postEvent(e)
}
}
d.authorized = true
} else {
Expand All @@ -267,7 +273,6 @@ func (d *MenderShellDaemon) gotAuthToken(p []dbus.SignalParams, needsReconnect b
connectionmanager.Close(ws.ProtoTypeShell)
d.authorized = false
}
return jwtToken
}

func (d *MenderShellDaemon) needsReconnect() bool {
Expand All @@ -292,12 +297,35 @@ func (d *MenderShellDaemon) dbusEventLoop(client mender.AuthClient) {
needsReconnect = true
}

p, _ := client.WaitForJwtTokenStateChange()
if len(p) > 0 && p[0].ParamType == dbus.GDBusTypeString {
token := d.gotAuthToken(p, needsReconnect)
if len(token) > 0 {
log.Debugf("dbusEventLoop: got a token len=%d", len(token))
needsReconnect = false
p, err := client.WaitForJwtTokenStateChange()
log.Tracef("dbusEventLoop: WaitForJwtTokenStateChange %v, err %v", p, err)
if len(p) > 1 &&
p[0].ParamType == dbus.GDBusTypeString &&
p[1].ParamType == dbus.GDBusTypeString {
token := p[0].ParamData.(string)
serverURL := p[1].ParamData.(string)
d.processJwtTokenStateChange(token, serverURL)
if len(token) > 0 && len(serverURL) > 0 {
log.Tracef("dbusEventLoop: got a token len=%d, ServerURL=%s", len(token), serverURL)
if token != d.serverJwt || serverURL != d.serverUrl {
log.Debugf(
"dbusEventLoop: new token or ServerURL, reconnecting. len=%d, ServerURL=%s",
len(token),
serverURL,
)
needsReconnect = true

// If the server (Mender client proxy) closed the connection, it is likely that
// both messageLoop is asking for a reconnection and we got JwtTokenStateChange
// signal. So drain here the reconnect channel and reconnect only once
if d.needsReconnect() {
log.Debug("dbusEventLoop: drained reconnect req channel")
}

}
// TODO: moving these assignments one scope up would make d.authorized redundant...
d.serverJwt = token
d.serverUrl = serverURL
}
}
if needsReconnect && d.authorized {
Expand All @@ -309,6 +337,7 @@ func (d *MenderShellDaemon) dbusEventLoop(client mender.AuthClient) {
}
log.Debugf("(dbusEventLoop) posting Event: %s", e.event)
d.serverUrl = serverURL
d.serverJwt = jwtToken
d.postEvent(e)
needsReconnect = false
}
Expand Down Expand Up @@ -336,14 +365,26 @@ func (d *MenderShellDaemon) eventLoop() {
log.Debugf("eventLoop: got event: %s", event.event)
switch event.event {
case EventReconnect:
err = connectionmanager.Reconnect(ws.ProtoTypeShell, d.serverUrl, d.deviceConnectUrl, event.data, d.skipVerify, d.serverCertificate, configuration.MaxReconnectAttempts, d.ctx)
err = connectionmanager.Reconnect(
ws.ProtoTypeShell,
d.serverUrl,
d.deviceConnectUrl,
event.data,
d.skipVerify,
d.serverCertificate,
configuration.MaxReconnectAttempts,
d.ctx,
)
var event string
if err != nil {
log.Errorf("eventLoop: event: error reconnecting: %s", err.Error())
log.Errorf("eventLoop: error reconnecting: %s", err.Error())
event = EventConnectionError
} else {
log.Debug("eventLoop: reconnected")
d.connectionEstChan <- MenderShellDaemonEvent{
event: EventConnectionEstablished,
}
log.Infof("eventLoop: Connection established with %s", d.serverUrl)
event = EventConnectionEstablished
}
d.connectionEstChan <- MenderShellDaemonEvent{
event: event,
}
}
}
Expand Down Expand Up @@ -399,49 +440,26 @@ func (d *MenderShellDaemon) Run() error {
//new dbus client
client, err := mender.NewAuthClient(dbusAPI)
if err != nil {
log.Errorf("mender-shall dbus failed to create client, error: %s", err.Error())
log.Errorf("mender-connect dbus failed to create client, error: %s", err.Error())
return err
}

//connection to dbus
err = client.Connect(mender.DBusObjectName, mender.DBusObjectPath, mender.DBusInterfaceName)
if err != nil {
log.Errorf("mender-shall dbus failed to connect, error: %s", err.Error())
log.Errorf("mender-connect dbus failed to connect, error: %s", err.Error())
return err
}

jwtToken, serverURL, err := client.GetJWTToken()
if err != nil {
log.Warnf("call to GetJWTToken on the Mender D-Bus API failed: %v", err)
}
d.serverJwt = jwtToken
d.serverUrl = serverURL

log.Debugf("GetJWTToken().len=%d", len(jwtToken))
if len(jwtToken) < 1 {
log.Info("waiting for JWT token (waitForJWTToken)")
jwtToken, err = d.waitForJWTToken(client)
if err != nil {
return err
}
d.authorized = true
} else {
if len(d.serverJwt) > 0 && len(d.serverUrl) > 0 {
d.authorized = true
}
log.Debugf("mender-connect got len(JWT)=%d", len(jwtToken))

err = connectionmanager.Connect(ws.ProtoTypeShell,
d.serverUrl,
d.deviceConnectUrl,
jwtToken,
d.skipVerify,
d.serverCertificate,
0,
d.ctx,
)
if err != nil {
log.Errorf("error on connecting, probably interrupted: %s", err.Error())
return err
}

go d.messageLoop()
go d.dbusEventLoop(client)
Expand Down

0 comments on commit 0afe7b7

Please sign in to comment.