Skip to content

Commit

Permalink
[FIXED] Possible delays in delivering messages
Browse files Browse the repository at this point in the history
There is a possibility that a partial write results in data
not being sent in a timely fashion to a subscription.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 3, 2019
1 parent ae80c4e commit 42f45ce
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
13 changes: 12 additions & 1 deletion server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type outbound struct {
pb int64 // Total pending/queued bytes.
pm int64 // Total pending/queued messages.
sg *sync.Cond // Flusher conditional for signaling.
sgw bool // Indicate flusher is waiting on condition wait.
wdl time.Duration // Snapshot fo write deadline.
mp int64 // snapshot of max pending.
fsp int // Flush signals that are pending from readLoop's pcd.
Expand Down Expand Up @@ -631,7 +632,9 @@ func (c *client) writeLoop() {
c.mu.Lock()
if waitOk && (c.out.pb == 0 || c.out.fsp > 0) && len(c.out.nb) == 0 && !c.flags.isSet(clearConnection) {
// Wait on pending data.
c.out.sgw = true
c.out.sg.Wait()
c.out.sgw = false
}
// Flush data
waitOk = c.flushOutbound()
Expand Down Expand Up @@ -917,13 +920,21 @@ func (c *client) flushOutbound() bool {
}
}

// Check that if there is still data to send and writeLoop is in wait,
// we need to signal
if c.out.pb > 0 && c.out.sgw {
c.out.sg.Signal()
}

return true
}

// flushSignal will use server to queue the flush IO operation to a pool of flushers.
// Lock must be held.
func (c *client) flushSignal() {
c.out.sg.Signal()
if c.out.sgw {
c.out.sg.Signal()
}
}

func (c *client) traceMsg(msg []byte) {
Expand Down
63 changes: 63 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,3 +1022,66 @@ func TestGetRandomIP(t *testing.T) {
}
}
}

type slowWriteConn struct {
net.Conn
}

func (swc *slowWriteConn) Write(b []byte) (int, error) {
// Limit the write to 10 bytes at a time.
max := len(b)
if max > 10 {
max = 10
}
return swc.Conn.Write(b[:max])
}

func TestClientWriteLoopStall(t *testing.T) {
opts := DefaultOptions()
s := RunServer(opts)
defer s.Shutdown()

errCh := make(chan error, 1)

url := fmt.Sprintf("nats://%s:%d", opts.Host, opts.Port)
nc, err := nats.Connect(url,
nats.ErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, e error) {
select {
case errCh <- e:
default:
}
}))
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer nc.Close()
sub, err := nc.SubscribeSync("foo")
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
cid, _ := nc.GetClientID()

sender, err := nats.Connect(url)
if err != nil {
t.Fatalf("Error on connect: %v", err)
}
defer sender.Close()

c := s.getClient(cid)
c.mu.Lock()
c.nc = &slowWriteConn{Conn: c.nc}
c.mu.Unlock()

sender.Publish("foo", make([]byte, 100))

if _, err := sub.NextMsg(3 * time.Second); err != nil {
t.Fatalf("WriteLoop has stalled!")
}

// Make sure that we did not get any async error
select {
case e := <-errCh:
t.Fatalf("Got error: %v", e)
case <-time.After(250 * time.Millisecond):
}
}

0 comments on commit 42f45ce

Please sign in to comment.