Skip to content

Commit

Permalink
set read and write deadlines web sockets so nothing blocks forever
Browse files Browse the repository at this point in the history
  • Loading branch information
skaes committed Aug 9, 2023
1 parent d163784 commit c840b87
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
5 changes: 4 additions & 1 deletion go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (s *ClientState) Connect() (err error) {
// Close sends a Close message to the server and closed the connection.
func (s *ClientState) Close() {
defer s.ws.Close()
s.ws.SetWriteDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
err := s.ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
logError("writing websocket close failed: %s", err)
Expand All @@ -102,6 +103,7 @@ func (s *ClientState) send(msg MsgBody) error {
return err
}
logDebug("sending message")
s.ws.SetWriteDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
err = s.ws.WriteMessage(websocket.TextMessage, b)
if err != nil {
logError("could not send message: %s", err)
Expand Down Expand Up @@ -253,6 +255,7 @@ func (s *ClientState) Reader() {
default:
}
logDebug("reading message")
s.ws.SetReadDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
msgType, bytes, err := s.ws.ReadMessage()
atomic.AddInt64(&processed, 1)
if err != nil || msgType != websocket.TextMessage {
Expand All @@ -273,7 +276,7 @@ func (s *ClientState) Reader() {
// Writer reads messages from an internal channel and dispatches them. It
// periodically sends a HEARTBEAT message to the server. It if receives a config
// change message, it replaces the current config with the new one. If the
// config change implies that the server URL has changed it exits, relying on
// config change implies that the server URL has changed, it exits, relying on
// the outer loop to restart the client.
func (s *ClientState) Writer() {
ticker := time.NewTicker(1 * time.Second)
Expand Down
2 changes: 2 additions & 0 deletions go/notification_mailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (s *MailerState) Connect() (err error) {
// Close sends a Close message on the websocket and closes it.
func (s *MailerState) Close() {
defer s.ws.Close()
s.ws.SetWriteDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
err := s.ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
logError("writing websocket close failed: %s", err)
Expand Down Expand Up @@ -89,6 +90,7 @@ func (s *MailerState) SendMail(text string) {
func (s *MailerState) Reader() {
for !interrupted {
logDebug("reading message")
s.ws.SetReadDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
msgType, bytes, err := s.ws.ReadMessage()
if err != nil || msgType != websocket.TextMessage {
logError("error reading from server socket: %s", err)
Expand Down
18 changes: 13 additions & 5 deletions go/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ func (s *ServerState) notificationReader(ws *websocket.Conn) {
s.wsChannel <- &WsMsg{body: MsgBody{Name: START_NOTIFY}, channel: dispatcherInput}
go s.notificationWriter(ws, dispatcherInput)
for !interrupted {
ws.SetReadDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
msgType, bytes, err := ws.ReadMessage()
if err != nil || msgType != websocket.TextMessage {
logError("notificationReader: could not read msg: %s", err)
Expand All @@ -570,6 +571,7 @@ func (s *ServerState) notificationWriter(ws *websocket.Conn, inputFromDispatcher
logInfo("Terminating notification websocket writer")
return
}
ws.SetWriteDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
ws.WriteMessage(websocket.TextMessage, []byte(data))
case <-time.After(100 * time.Millisecond):
// give the outer loop a chance to detect interrupts (without doing a busy wait)
Expand Down Expand Up @@ -694,6 +696,7 @@ func (s *ServerState) wsReader(ws *websocket.Conn) {
var body MsgBody

for !interrupted {
ws.SetReadDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
msgType, bytes, err := ws.ReadMessage()
atomic.AddInt64(&processed, 1)
if err != nil || msgType != websocket.TextMessage {
Expand All @@ -720,14 +723,18 @@ func (s *ServerState) wsReader(ws *websocket.Conn) {
func (s *ServerState) wsWriter(clientID string, ws *websocket.Conn, inputFromDispatcher chan string) {
s.waitGroup.Add(1)
defer s.waitGroup.Done()
defer ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(1000, "good bye"))
defer func() {
ws.SetWriteDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
ws.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(1000, "good bye"))
}()
for !interrupted {
select {
case data, ok := <-inputFromDispatcher:
if !ok {
logInfo("Closed channel for %s", clientID)
return
}
ws.SetWriteDeadline(time.Now().Add(websocket.DefaultDialer.HandshakeTimeout))
ws.WriteMessage(websocket.TextMessage, []byte(data))
case <-time.After(100 * time.Millisecond):
// give the outer loop a chance to detect interrupts
Expand All @@ -738,11 +745,12 @@ func (s *ServerState) wsWriter(clientID string, ws *websocket.Conn, inputFromDis
// Initialize completes the state initialization by checking redis connectivity
// and loading saved state.
func (s *ServerState) Initialize() {
path := s.GetConfig().RedisMasterFile
VerifyMasterFileString(path)
config := s.GetConfig()
websocket.DefaultDialer.HandshakeTimeout = time.Duration(config.DialTimeout) * time.Second
VerifyMasterFileString(config.RedisMasterFile)
var masters map[string]string
if MasterFileExists(path) {
masters = RedisMastersFromMasterFile(path)
if MasterFileExists(config.RedisMasterFile) {
masters = RedisMastersFromMasterFile(config.RedisMasterFile)
} else if s.opts.ConsulClient != nil {
kv, err := s.opts.ConsulClient.GetState()
if err != nil {
Expand Down

0 comments on commit c840b87

Please sign in to comment.