Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Replicator FD leak: make sure client side keeps reading until it gets…
Browse files Browse the repository at this point in the history
… an error
  • Loading branch information
datoug committed Aug 3, 2017
1 parent 123b525 commit b1e8362
Showing 1 changed file with 18 additions and 8 deletions.
26 changes: 18 additions & 8 deletions services/replicator/outconn.go
Expand Up @@ -111,8 +111,6 @@ func (conn *outConnection) close() {
}

func (conn *outConnection) writeCreditsStream() {
defer conn.stream.Done()

if err := conn.sendCredits(initialCreditSize); err != nil {
conn.logger.Error(`error writing initial credits`)

Expand Down Expand Up @@ -152,11 +150,15 @@ func (conn *outConnection) readMsgStream() {
var sealMsgRead bool
var numMsgsRead int32

// Note we must continue read until we hit an error before returning from this function
// Because the websocket client only tear down the underlying connection when it gets a read error
readloop:
for {
rmc, err := conn.stream.Read()
if err != nil {
conn.logger.WithField(common.TagErr, err).Error(`Error reading msg`)
go conn.close()
conn.stream.Done()
return
}

Expand All @@ -169,7 +171,8 @@ func (conn *outConnection) readMsgStream() {
"seqNum": msg.Message.GetSequenceNumber(),
}).Error("regular message read after seal message")
go conn.close()
return
conn.stream.Done()
continue readloop
}

// Sequence number check to make sure we get monotonically increasing sequence number.
Expand All @@ -181,7 +184,8 @@ func (conn *outConnection) readMsgStream() {
"expectedSeqNum": expectedSeqNum,
}).Error("sequence number out of order")
go conn.close()
return
conn.stream.Done()
continue readloop
}

// update the lastSeqNum to this value
Expand All @@ -198,7 +202,8 @@ func (conn *outConnection) readMsgStream() {
atomic.StoreInt64(&conn.lastMsgReplicatedTime, time.Now().UnixNano())
case <-conn.closeChannel:
conn.logger.Info(`writing msg to the channel failed because of shutdown`)
return
conn.stream.Done()
continue readloop
}

case store.ReadMessageContentType_SEALED:
Expand All @@ -215,15 +220,20 @@ func (conn *outConnection) readMsgStream() {
atomic.StoreInt64(&conn.lastMsgReplicatedTime, time.Now().UnixNano())
case <-conn.closeChannel:
conn.logger.Info(`writing msg to the channel failed because of shutdown`)
return
conn.stream.Done()
continue readloop
}

return
conn.stream.Done()
continue readloop

case store.ReadMessageContentType_ERROR:
msgErr := rmc.GetError()
conn.logger.WithField(`Message`, msgErr.GetMessage()).Error(`received error from reading msg`)
go conn.close()
return
conn.stream.Done()
continue readloop

default:
conn.logger.WithField(`Type`, rmc.GetType()).Error(`received ReadMessageContent with unrecognized type`)
}
Expand Down

0 comments on commit b1e8362

Please sign in to comment.