Skip to content

Commit

Permalink
*: use CompareAndSwap instead of CAS for atomics
Browse files Browse the repository at this point in the history
go.uber.org/atomic deprecated CAS methods in version 1.10 (that introduced
CompareAndSwap), so we need to fix it.

Signed-off-by: Roman Khimov <roman@nspcc.ru>
  • Loading branch information
roman-khimov committed Apr 27, 2023
1 parent 2567c4c commit 8342bde
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 20 deletions.
4 changes: 2 additions & 2 deletions pkg/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (s *service) Name() string {
}

func (s *service) Start() {
if s.started.CAS(false, true) {
if s.started.CompareAndSwap(false, true) {
s.log.Info("starting consensus service")
b, _ := s.Chain.GetBlock(s.Chain.CurrentBlockHash()) // Can't fail, we have some current block!
s.lastTimestamp = b.Timestamp
Expand All @@ -288,7 +288,7 @@ func (s *service) Start() {

// Shutdown implements the Service interface.
func (s *service) Shutdown() {
if s.started.CAS(true, false) {
if s.started.CompareAndSwap(true, false) {
s.log.Info("stopping consensus service")
close(s.quit)
<-s.finished
Expand Down
2 changes: 1 addition & 1 deletion pkg/network/bqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (bq *Queue) LastQueued() (uint32, int) {

// Discard stops the queue and prevents it from accepting more blocks to enqueue.
func (bq *Queue) Discard() {
if bq.discarded.CAS(false, true) {
if bq.discarded.CompareAndSwap(false, true) {
bq.queueLock.Lock()
close(bq.checkBlocks)
// Technically we could bq.queue = nil, but this would cost
Expand Down
6 changes: 3 additions & 3 deletions pkg/network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (s *Server) tryStartServices() {
return
}

if s.IsInSync() && s.syncReached.CAS(false, true) {
if s.IsInSync() && s.syncReached.CompareAndSwap(false, true) {
s.log.Info("node reached synchronized state, starting services")
if s.chain.P2PSigExtensionsEnabled() {
s.notaryRequestPool.RunSubscriptions() // WSClient is also a subscriber.
Expand Down Expand Up @@ -1277,14 +1277,14 @@ func getRequestBlocksPayload(p Peer, currHeight uint32, lastRequestedHeight *ato
old := lastRequestedHeight.Load()
if old <= currHeight {
needHeight = currHeight + 1
if !lastRequestedHeight.CAS(old, needHeight) {
if !lastRequestedHeight.CompareAndSwap(old, needHeight) {
continue
}
} else if old < currHeight+(bqueue.CacheSize-payload.MaxHashesCount) {
needHeight = currHeight + 1
if peerHeight > old+payload.MaxHashesCount {
needHeight = old + payload.MaxHashesCount
if !lastRequestedHeight.CAS(old, needHeight) {
if !lastRequestedHeight.CompareAndSwap(old, needHeight) {
continue
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpcclient/wsclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func NewWS(ctx context.Context, endpoint string, opts WSOptions) (*WSClient, err
// Close closes connection to the remote side rendering this client instance
// unusable.
func (c *WSClient) Close() {
if c.closeCalled.CAS(false, true) {
if c.closeCalled.CompareAndSwap(false, true) {
c.setCloseErr(errConnClosedByUser)
// Closing shutdown channel sends a signal to wsWriter to break out of the
// loop. In doing so it does ws.Close() closing the network connection
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewService(name string, httpServers []*http.Server, cfg config.BasicService
// Start runs http service with the exposed endpoint on the configured port.
func (ms *Service) Start() error {
if ms.config.Enabled {
if !ms.started.CAS(false, true) {
if !ms.started.CompareAndSwap(false, true) {
ms.log.Info("service already started")
return nil
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func (ms *Service) ShutDown() {
if !ms.config.Enabled {
return
}
if !ms.started.CAS(true, false) {
if !ms.started.CompareAndSwap(true, false) {
return
}
for _, srv := range ms.http {
Expand Down
4 changes: 2 additions & 2 deletions pkg/services/notary/notary.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (n *Notary) Name() string {
// Start runs a Notary module in a separate goroutine.
// The Notary only starts once, subsequent calls to Start are no-op.
func (n *Notary) Start() {
if !n.started.CAS(false, true) {
if !n.started.CompareAndSwap(false, true) {
return
}
n.Config.Log.Info("starting notary service")
Expand Down Expand Up @@ -221,7 +221,7 @@ drainLoop:
// to Shutdown on the same instance are no-op. The instance that was stopped can
// not be started again by calling Start (use a new instance if needed).
func (n *Notary) Shutdown() {
if !n.started.CAS(true, false) {
if !n.started.CompareAndSwap(true, false) {
return
}
n.Config.Log.Info("stopping notary service")
Expand Down
14 changes: 7 additions & 7 deletions pkg/services/rpcsrv/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestSubscriptions(t *testing.T) {
for _, id := range subIDs {
callUnsubscribe(t, c, respMsgs, id)
}
finishedFlag.CAS(false, true)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

Expand Down Expand Up @@ -312,7 +312,7 @@ func TestFilteredSubscriptions(t *testing.T) {

callUnsubscribe(t, c, respMsgs, subID)
callUnsubscribe(t, c, respMsgs, blockSubID)
finishedFlag.CAS(false, true)
finishedFlag.CompareAndSwap(false, true)
c.Close()
})
}
Expand Down Expand Up @@ -408,7 +408,7 @@ func TestFilteredNotaryRequestSubscriptions(t *testing.T) {
callUnsubscribe(t, c, respMsgs, subID)
})
}
finishedFlag.CAS(false, true)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

Expand Down Expand Up @@ -448,7 +448,7 @@ func TestFilteredBlockSubscriptions(t *testing.T) {
require.Equal(t, 3, int(primary))
}
callUnsubscribe(t, c, respMsgs, blockSubID)
finishedFlag.CAS(false, true)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

Expand Down Expand Up @@ -477,7 +477,7 @@ func TestMaxSubscriptions(t *testing.T) {
}
}

finishedFlag.CAS(false, true)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

Expand Down Expand Up @@ -519,7 +519,7 @@ func TestBadSubUnsub(t *testing.T) {
t.Run("subscribe", testF(t, subCases))
t.Run("unsubscribe", testF(t, unsubCases))

finishedFlag.CAS(false, true)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}

Expand Down Expand Up @@ -614,6 +614,6 @@ func TestSubscriptionOverflow(t *testing.T) {
// `Missed` is the last event and there is nothing afterwards.
require.Equal(t, 0, len(respMsgs))

finishedFlag.CAS(false, true)
finishedFlag.CompareAndSwap(false, true)
c.Close()
}
4 changes: 2 additions & 2 deletions pkg/services/stateroot/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *service) Name() string {
// Start runs service instance in a separate goroutine.
// The service only starts once, subsequent calls to Start are no-op.
func (s *service) Start() {
if !s.started.CAS(false, true) {
if !s.started.CompareAndSwap(false, true) {
return
}
s.log.Info("starting state validation service")
Expand Down Expand Up @@ -68,7 +68,7 @@ drainloop:
// to Shutdown on the same instance are no-op. The instance that was stopped can
// not be started again by calling Start (use a new instance if needed).
func (s *service) Shutdown() {
if !s.started.CAS(true, false) {
if !s.started.CompareAndSwap(true, false) {
return
}
s.log.Info("stopping state validation service")
Expand Down

0 comments on commit 8342bde

Please sign in to comment.