Skip to content

Commit

Permalink
[FIXED] Data races websocket/shutdown, DisconnectClientByID and LDMCl…
Browse files Browse the repository at this point in the history
…ientByID (#5398)

Saw a data race in Travis during a websocket test during shutdown:
```
==================
WARNING: DATA RACE
Write at 0x00c004430618 by goroutine 112330:
  github.com/nats-io/nats-server/v2/server.(*Server).Shutdown()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/server.go:2527 +0x87e
  github.com/nats-io/nats-server/v2/server.TestWSTLSConnection.deferwrap1()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/websocket_test.go:2223 +0x33
  runtime.deferreturn()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/runtime/panic.go:602 +0x5d
  testing.tRunner()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:1689 +0x21e
  testing.(*T).Run.gowrap1()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:1742 +0x44
Previous read at 0x00c004430618 by goroutine 112339:
  github.com/nats-io/nats-server/v2/server.(*client).generateClientInfoJSON()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/client.go:2389 +0x406
  github.com/nats-io/nats-server/v2/server.(*Server).createWSClient()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/websocket.go:1280 +0xb24
  github.com/nats-io/nats-server/v2/server.(*Server).startWebsocketServer.func1()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/websocket.go:1184 +0x1db
  net/http.HandlerFunc.ServeHTTP()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:2166 +0x47
  net/http.(*ServeMux).ServeHTTP()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:2683 +0x1ef
  net/http.serverHandler.ServeHTTP()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:3137 +0x2a1
  net/http.(*conn).serve()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:2039 +0x13c4
  net/http.(*Server).Serve.gowrap3()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:3285 +0x4f
Goroutine 112330 (running) created at:
  testing.(*T).Run()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:1742 +0x825
  testing.runTests.func1()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:2161 +0x85
  testing.tRunner()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:1689 +0x21e
  testing.runTests()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:2159 +0x8be
  testing.(*M).Run()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:2027 +0xf17
  github.com/nats-io/nats-server/v2/server.TestMain()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/sublist_test.go:1602 +0x416
  main.main()
      _testmain.go:2281 +0x2d4
Goroutine 112339 (running) created at:
  net/http.(*Server).Serve()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:3285 +0x8ec
  github.com/nats-io/nats-server/v2/server.(*Server).startWebsocketServer.func2()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/websocket.go:1208 +0x5b
==================

==================
WARNING: DATA RACE
Write at 0x00c004430620 by goroutine 112330:
  github.com/nats-io/nats-server/v2/server.(*Server).Shutdown()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/server.go:2528 +0x8c4
  github.com/nats-io/nats-server/v2/server.TestWSTLSConnection.deferwrap1()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/websocket_test.go:2223 +0x33
  runtime.deferreturn()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/runtime/panic.go:602 +0x5d
  testing.tRunner()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:1689 +0x21e
  testing.(*T).Run.gowrap1()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:1742 +0x44
Previous read at 0x00c004430620 by goroutine 112339:
  github.com/nats-io/nats-server/v2/server.(*client).generateClientInfoJSON()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/client.go:2394 +0x506
  github.com/nats-io/nats-server/v2/server.(*Server).createWSClient()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/websocket.go:1280 +0xb24
  github.com/nats-io/nats-server/v2/server.(*Server).startWebsocketServer.func1()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/websocket.go:1184 +0x1db
  net/http.HandlerFunc.ServeHTTP()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:2166 +0x47
  net/http.(*ServeMux).ServeHTTP()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:2683 +0x1ef
  net/http.serverHandler.ServeHTTP()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:3137 +0x2a1
  net/http.(*conn).serve()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:2039 +0x13c4
  net/http.(*Server).Serve.gowrap3()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:3285 +0x4f
Goroutine 112330 (running) created at:
  testing.(*T).Run()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:1742 +0x825
  testing.runTests.func1()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:2161 +0x85
  testing.tRunner()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:1689 +0x21e
  testing.runTests()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:2159 +0x8be
  testing.(*M).Run()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/testing/testing.go:2027 +0xf17
  github.com/nats-io/nats-server/v2/server.TestMain()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/sublist_test.go:1602 +0x416
  main.main()
      _testmain.go:2281 +0x2d4
Goroutine 112339 (running) created at:
  net/http.(*Server).Serve()
      /home/travis/.gimme/versions/go1.22.2.linux.amd64/src/net/http/server.go:3285 +0x8ec
  github.com/nats-io/nats-server/v2/server.(*Server).startWebsocketServer.func2()
      /home/travis/gopath/src/github.com/nats-io/nats-server/server/websocket.go:1208 +0x5b
==================

```

Then from code inspection, also saw possible data races in
DisconnectClientByID and LDMClientByID so added to the test
`TestServerEventsLDMKick` and saw those:
```
==================
WARNING: DATA RACE
Write at 0x00c000099320 by goroutine 491:
  runtime.mapaccess2_fast64()
      /usr/local/go/src/runtime/map_fast64.go:53 +0x1cc
  github.com/nats-io/nats-server/v2/server.(*Server).createClientEx()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3191 +0x9cc
  github.com/nats-io/nats-server/v2/server.(*Server).createClient()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3064 +0x40
  github.com/nats-io/nats-server/v2/server.(*Server).AcceptLoop.func2()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:2682 +0x18
  github.com/nats-io/nats-server/v2/server.(*Server).acceptConnections.func1()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:2734 +0x70
  github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine.func1()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3785 +0x58

Previous read at 0x00c000099320 by goroutine 473:
  runtime.evacuate_fast32()
      /usr/local/go/src/runtime/map_fast32.go:374 +0x38c
  github.com/nats-io/nats-server/v2/server.(*Server).LDMClientByID()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:4457 +0x208
  github.com/nats-io/nats-server/v2/server.(*Server).ldmClient.func1()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events.go:2823 +0x44
  github.com/nats-io/nats-server/v2/server.(*Server).zReq()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events.go:1935 +0x1bc
  github.com/nats-io/nats-server/v2/server.(*Server).ldmClient()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events.go:2822 +0x230
  github.com/nats-io/nats-server/v2/server.(*Server).ldmClient-fm()
      <autogenerated>:1 +0xa0
  github.com/nats-io/nats-server/v2/server.(*Server).internalReceiveLoop()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events.go:424 +0x380
  github.com/nats-io/nats-server/v2/server.(*Server).setSystemAccount.func5()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:1755 +0x34

Goroutine 491 (running) created at:
  github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3783 +0x160
  github.com/nats-io/nats-server/v2/server.(*Server).acceptConnections()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:2732 +0x154
  github.com/nats-io/nats-server/v2/server.(*Server).AcceptLoop.func9()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:2682 +0x70

Goroutine 473 (running) created at:
  github.com/nats-io/nats-server/v2/server.(*Server).setSystemAccount()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:1755 +0x654
  github.com/nats-io/nats-server/v2/server.TestServerEventsLDMKick()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events_test.go:3433 +0xc4
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1595 +0x1b0
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1648 +0x40
==================
```

```
==================
WARNING: DATA RACE
Write at 0x00c00015d7a0 by goroutine 3115:
  runtime.mapassign_fast64ptr()
      /usr/local/go/src/runtime/map_fast64.go:183 +0x35c
  github.com/nats-io/nats-server/v2/server.(*Server).removeClient()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3474 +0x114
  github.com/nats-io/nats-server/v2/server.(*client).closeConnection()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/client.go:5468 +0x3d4
  github.com/nats-io/nats-server/v2/server.(*client).authViolation()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/client.go:2214 +0x4e8
  github.com/nats-io/nats-server/v2/server.(*client).processConnect()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/client.go:2104 +0xbcc
  github.com/nats-io/nats-server/v2/server.(*client).parse()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/parser.go:942 +0xe90
  github.com/nats-io/nats-server/v2/server.(*client).readLoop()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/client.go:1388 +0x13a4
  github.com/nats-io/nats-server/v2/server.(*Server).createClientEx.func1()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3300 +0x50
  github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine.func1()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3785 +0x58

Previous read at 0x00c00015d7a0 by goroutine 3096:
  runtime.evacuate_fast32()
      /usr/local/go/src/runtime/map_fast32.go:374 +0x38c
  github.com/nats-io/nats-server/v2/server.(*Server).DisconnectClientByID()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:4444 +0x4c
  github.com/nats-io/nats-server/v2/server.(*Server).kickClient.func1()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events.go:2805 +0x44
  github.com/nats-io/nats-server/v2/server.(*Server).zReq()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events.go:1935 +0x1bc
  github.com/nats-io/nats-server/v2/server.(*Server).kickClient()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events.go:2804 +0x230
  github.com/nats-io/nats-server/v2/server.(*Server).kickClient-fm()
      <autogenerated>:1 +0xa0
  github.com/nats-io/nats-server/v2/server.(*Server).internalReceiveLoop()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events.go:424 +0x380
  github.com/nats-io/nats-server/v2/server.(*Server).setSystemAccount.func5()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:1755 +0x34

Goroutine 3115 (running) created at:
  github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3783 +0x160
  github.com/nats-io/nats-server/v2/server.(*Server).createClientEx()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3300 +0x13e4
  github.com/nats-io/nats-server/v2/server.(*Server).createClient()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3064 +0x40
  github.com/nats-io/nats-server/v2/server.(*Server).AcceptLoop.func2()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:2682 +0x18
  github.com/nats-io/nats-server/v2/server.(*Server).acceptConnections.func1()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:2734 +0x70
  github.com/nats-io/nats-server/v2/server.(*Server).startGoRoutine.func1()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:3785 +0x58

Goroutine 3096 (running) created at:
  github.com/nats-io/nats-server/v2/server.(*Server).setSystemAccount()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/server.go:1755 +0x654
  github.com/nats-io/nats-server/v2/server.TestServerEventsLDMKick()
      /Users/ivan/dev/go/src/github.com/nats-io/nats-server/server/events_test.go:3433 +0xc4
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1595 +0x1b0
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1648 +0x40
==================
```

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
derekcollison committed May 8, 2024
2 parents d1a1b9b + 58b8662 commit a895101
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 36 deletions.
28 changes: 22 additions & 6 deletions server/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3459,10 +3459,28 @@ func TestServerEventsLDMKick(t *testing.T) {
reqkick := KickClientReq{CID: cid}
reqkickpayload, _ := json.Marshal(reqkick)

_, err = ncs.Request(fmt.Sprintf("$SYS.REQ.SERVER.%s.LDM", s.ID()), reqldmpayload, time.Second)
if err != nil {
t.Fatalf("Error trying to publish the LDM request: %v", err)
// Test for data races when getting the client by ID
uc := createUserCreds(t, s, akp2)
totalClients := 100
someClients := make([]*nats.Conn, 0, totalClients)
for i := 0; i < totalClients; i++ {
nc, err := nats.Connect(s.ClientURL(), uc)
require_NoError(t, err)
defer nc.Close()
someClients = append(someClients, nc)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < totalClients; i++ {
someClients[i].Close()
}
}()
defer wg.Wait()

_, err = ncs.Request(fmt.Sprintf("$SYS.REQ.SERVER.%s.LDM", s.ID()), reqldmpayload, time.Second)
require_NoError(t, err)

select {
case <-ldmed:
Expand All @@ -3471,9 +3489,7 @@ func TestServerEventsLDMKick(t *testing.T) {
}

_, err = ncs.Request(fmt.Sprintf("$SYS.REQ.SERVER.%s.KICK", s.ID()), reqkickpayload, time.Second)
if err != nil {
t.Fatalf("Error trying to publish the KICK request: %v", err)
}
require_NoError(t, err)

select {
case <-disconnected:
Expand Down
74 changes: 44 additions & 30 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2521,12 +2521,7 @@ func (s *Server) Shutdown() {
}

// Kick websocket server
if s.websocket.server != nil {
doneExpected++
s.websocket.server.Close()
s.websocket.server = nil
s.websocket.listener = nil
}
doneExpected += s.closeWebsocketServer()

// Kick MQTT accept loop
if s.mqtt.listener != nil {
Expand Down Expand Up @@ -2614,6 +2609,24 @@ func (s *Server) Shutdown() {
close(s.shutdownComplete)
}

// Close the websocket server if running. If so, returns 1, else 0.
// Server lock held on entry.
func (s *Server) closeWebsocketServer() int {
ws := &s.websocket
ws.mu.Lock()
hs := ws.server
if hs != nil {
ws.server = nil
ws.listener = nil
}
ws.mu.Unlock()
if hs != nil {
hs.Close()
return 1
}
return 0
}

// WaitForShutdown will block until the server has been fully shutdown.
func (s *Server) WaitForShutdown() {
<-s.shutdownComplete
Expand Down Expand Up @@ -4133,12 +4146,7 @@ func (s *Server) lameDuckMode() {
expected := 1
s.listener.Close()
s.listener = nil
if s.websocket.server != nil {
expected++
s.websocket.server.Close()
s.websocket.server = nil
s.websocket.listener = nil
}
expected += s.closeWebsocketServer()
s.ldmCh = make(chan bool, expected)
opts := s.getOpts()
gp := opts.LameDuckGracePeriod
Expand Down Expand Up @@ -4439,8 +4447,10 @@ func (s *Server) changeRateLimitLogInterval(d time.Duration) {

// DisconnectClientByID disconnects a client by connection ID
func (s *Server) DisconnectClientByID(id uint64) error {
client := s.clients[id]
if client != nil {
if s == nil {
return ErrServerNotRunning
}
if client := s.getClient(id); client != nil {
client.closeConnection(Kicked)
return nil
}
Expand All @@ -4449,23 +4459,27 @@ func (s *Server) DisconnectClientByID(id uint64) error {

// LDMClientByID sends a Lame Duck Mode info message to a client by connection ID
func (s *Server) LDMClientByID(id uint64) error {
if s == nil {
return ErrServerNotRunning
}
s.mu.RLock()
c := s.clients[id]
if c == nil {
s.mu.RUnlock()
return errors.New("no such client id")
}
info := s.copyInfo()
info.LameDuckMode = true

c := s.clients[id]
if c != nil {
c.mu.Lock()
defer c.mu.Unlock()
if c.opts.Protocol >= ClientProtoInfo &&
c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
c.Debugf("sending Lame Duck Mode info to client")
c.enqueueProto(c.generateClientInfoJSON(info))
return nil
} else {
return errors.New("ClientProtoInfo < ClientOps.Protocol or first pong not sent")
}
s.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
if c.opts.Protocol >= ClientProtoInfo && c.flags.isSet(firstPongSent) {
// sendInfo takes care of checking if the connection is still
// valid or not, so don't duplicate tests here.
c.Debugf("Sending Lame Duck Mode info to client")
c.enqueueProto(c.generateClientInfoJSON(info))
return nil
} else {
return errors.New("client does not support Lame Duck Mode or is not ready to receive the notification")
}
return errors.New("no such client id")
}

0 comments on commit a895101

Please sign in to comment.