Skip to content

Commit

Permalink
Send a close message over the websocket when closing a session (#3914)
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Plourde committed Jul 16, 2020
1 parent 89a0d0f commit d5e039f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 4 deletions.
30 changes: 26 additions & 4 deletions backend/agentd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import (
"github.com/sirupsen/logrus"
)

const deletedEventSentinel = -1
const (
deletedEventSentinel = -1

// Time to wait before force close on connection.
closeGracePeriod = 10 * time.Second
)

var (
sessionCounter = prometheus.NewGaugeVec(
Expand Down Expand Up @@ -272,8 +277,6 @@ func (s *Session) sender() {
// Handle the delete and unknown watch events
switch watchEvent.Action {
case store.WatchDelete:
// The entity was deleted, we should sever the connection to the agent
// so it can register back
s.cancel()
continue
case store.WatchUnknown:
Expand Down Expand Up @@ -427,8 +430,27 @@ func (s *Session) stop() {
}
}()

// Send a close message to ensure the agent closes its connection if the
// connection is not already closed
if !s.conn.Closed() {
if err := s.conn.SendCloseMessage(); err != nil {
logger.Warning("unexpected error while sending a close message to the agent")
}
}

sessionCounter.WithLabelValues(s.cfg.Namespace).Dec()
s.wg.Wait()

// Gracefully wait for the send and receiver to exit, but force the websocket
// connection to close itself after the grace period
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(closeGracePeriod):
}

close(s.entityConfig.updatesChannel)
close(s.checkChannel)
Expand Down
70 changes: 70 additions & 0 deletions backend/agentd/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/sensu/sensu-go/backend/messaging"
"github.com/sensu/sensu-go/backend/store"
"github.com/sensu/sensu-go/backend/store/v2/storetest"
"github.com/sensu/sensu-go/handler"
"github.com/sensu/sensu-go/testing/mockbus"
"github.com/sensu/sensu-go/testing/mockstore"
"github.com/sensu/sensu-go/testing/mocktransport"
Expand Down Expand Up @@ -98,6 +99,7 @@ func TestSession(t *testing.T) {
// Close our wait channel once we asserted the message
wg.Done()
}).Return(nil)
conn.On("Closed").Return(true)
conn.On("Close").Return(nil)
},
busFunc: func(bus *messaging.WizardBus, wg *sync.WaitGroup) {
Expand All @@ -112,6 +114,8 @@ func TestSession(t *testing.T) {
name: "delete watch event stops the agent session",
connFunc: func(conn *mocktransport.MockTransport, wg *sync.WaitGroup) {
conn.On("Receive").After(100*time.Millisecond).Return(&transport.Message{}, nil)
conn.On("Closed").Return(false)
conn.On("SendCloseMessage").Return(nil)
conn.On("Close").Return(nil)
},
busFunc: func(bus *messaging.WizardBus, wg *sync.WaitGroup) {
Expand All @@ -129,6 +133,7 @@ func TestSession(t *testing.T) {
// The Send() method should only be called once, otherwise it means the
// unknown event also sent something
conn.On("Send", mock.Anything).Once().Return(transport.ClosedError{})
conn.On("Closed").Return(true)
conn.On("Close").Return(nil)
},
busFunc: func(bus *messaging.WizardBus, wg *sync.WaitGroup) {
Expand All @@ -152,6 +157,7 @@ func TestSession(t *testing.T) {
name: "invalid class entities are reset to the agent class",
connFunc: func(conn *mocktransport.MockTransport, wg *sync.WaitGroup) {
conn.On("Receive").After(100*time.Millisecond).Return(&transport.Message{}, nil)
conn.On("Closed").Return(true)
conn.On("Close").Return(nil)
},
busFunc: func(bus *messaging.WizardBus, wg *sync.WaitGroup) {
Expand All @@ -176,6 +182,7 @@ func TestSession(t *testing.T) {
connFunc: func(conn *mocktransport.MockTransport, wg *sync.WaitGroup) {
conn.On("Receive").After(100*time.Millisecond).Return(&transport.Message{}, nil)
conn.On("Send", mock.Anything).Return(transport.ConnectionError{Message: "some horrible network outage"})
conn.On("Closed").Return(true)
conn.On("Close").Return(nil)
},
busFunc: func(bus *messaging.WizardBus, wg *sync.WaitGroup) {
Expand All @@ -202,6 +209,7 @@ func TestSession(t *testing.T) {
wg.Done()
}
}).Return(nil)
conn.On("Closed").Return(true)
conn.On("Close").Return(nil)
},
busFunc: func(bus *messaging.WizardBus, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -239,6 +247,7 @@ func TestSession(t *testing.T) {
t.Fatalf("did not expect to receive a message of type %s", corev2.CheckRequestType)
}
}).Return(nil)
conn.On("Closed").Return(true)
conn.On("Close").Return(nil)
},
busFunc: func(bus *messaging.WizardBus, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -610,3 +619,64 @@ func Test_removeEmptySubscriptions(t *testing.T) {
})
}
}

func TestSession_receiver(t *testing.T) {
type connFunc func(*mocktransport.MockTransport, context.CancelFunc)

tests := []struct {
name string
connFunc connFunc
}{
{
name: "incoming messages are handled",
connFunc: func(conn *mocktransport.MockTransport, cancel context.CancelFunc) {
conn.On("Receive").Once().Return(&transport.Message{}, nil)
conn.On("Receive").Once().Run(func(args mock.Arguments) {
cancel()
}).Return(&transport.Message{}, nil)
},
},
{
name: "random errors are handled",
connFunc: func(conn *mocktransport.MockTransport, cancel context.CancelFunc) {
conn.On("Receive").Once().Return(&transport.Message{}, errors.New("error"))
conn.On("Receive").Once().Run(func(args mock.Arguments) {
cancel()
}).Return(&transport.Message{}, nil)
},
},
{
name: "transport errors are handled",
connFunc: func(conn *mocktransport.MockTransport, cancel context.CancelFunc) {
conn.On("Receive").Once().Return(&transport.Message{}, transport.ConnectionError{})
conn.On("Receive").Once().Run(func(args mock.Arguments) {
cancel()
}).Return(&transport.Message{}, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
conn := new(mocktransport.MockTransport)
if tt.connFunc != nil {
tt.connFunc(conn, cancel)
}

s := &Session{
cfg: SessionConfig{
WriteTimeout: 5,
},
conn: conn,
ctx: ctx,
cancel: cancel,
wg: &sync.WaitGroup{},
}
s.wg.Add(1)
s.handler = handler.NewMessageHandler()
go s.receiver()

s.wg.Wait()
})
}
}
7 changes: 7 additions & 0 deletions testing/mocktransport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ func (m *MockTransport) Send(message *transport.Message) error {
return args.Error(0)
}

// SendCloseMessage ...
func (m *MockTransport) SendCloseMessage() error {
args := m.Called()
return args.Error(0)
}

// Heartbeat ...
func (m *MockTransport) Heartbeat(ctx context.Context, interval, timeout int) {
_ = m.Called(ctx, interval, timeout)
}
10 changes: 10 additions & 0 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ type Transport interface {
// sent. Send is synchronous, returning nil if the write to the underlying
// socket was successful and an error otherwise.
Send(*Message) error

// SendCloseMessage sends a close control message over the transport, and the
// peer should echo the message back and that message will be returned as an
// error from the websocket connection's read API
SendCloseMessage() error
}

// A WebSocketTransport is a connection between sensu Agents and Backends over
Expand Down Expand Up @@ -289,3 +294,8 @@ func (t *WebSocketTransport) Send(m *Message) (err error) {

return nil
}

// SendCloseMessage sends a close control message over the transport
func (t *WebSocketTransport) SendCloseMessage() (err error) {
return t.Connection.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
}

0 comments on commit d5e039f

Please sign in to comment.