Skip to content

Commit

Permalink
chore: replace time.After with time.NewTicker (ChainSafe#1650)
Browse files Browse the repository at this point in the history
* replace time.After with time.NewTimer

* replace time.Affer with time.NewTicker

* lint

* replace time.After is discovery so ttl var is used

* replace time.After in if statement

* add configuration variables for time duration functions
  • Loading branch information
edwardmack authored and timwu20 committed Dec 6, 2021
1 parent 63f73ca commit a84466d
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 27 deletions.
8 changes: 6 additions & 2 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ func (d *discovery) start() error {
// get all currently connected peers and use them to bootstrap the DHT
peers := d.h.Network().Peers()

t := time.NewTicker(startDHTTimeout)
defer t.Stop()
for {
if len(peers) > 0 {
break
}

select {
case <-time.After(startDHTTimeout):
case <-t.C:
logger.Debug("no peers yet, waiting to start DHT...")
// wait for peers to connect before starting DHT, otherwise DHT bootstrap nodes
// will be empty and we will fail to fill the routing table
Expand Down Expand Up @@ -169,11 +171,13 @@ func (d *discovery) advertise() {
}

func (d *discovery) checkPeerCount() {
t := time.NewTicker(connectToPeersTimeout)
defer t.Stop()
for {
select {
case <-d.ctx.Done():
return
case <-time.After(connectToPeersTimeout):
case <-t.C:
if len(d.h.Network().Peers()) > d.minPeers {
continue
}
Expand Down
48 changes: 30 additions & 18 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ const (
badPeerThreshold int = -2
protectedPeerThreshold int = 7

defaultSlotDuration = time.Second * 6
defaultSlotDuration = time.Second * 6
defaultHandleResponseQueueDuration = time.Second
defaultPrunePeersDuration = time.Second * 30
)

var (
Expand Down Expand Up @@ -132,26 +134,30 @@ type syncQueue struct {
goal int64 // goal block number we are trying to sync to
currStart, currEnd int64 // the start and end of the BlockResponse we are currently handling; 0 and 0 if we are not currently handling any

benchmarker *syncBenchmarker
benchmarker *syncBenchmarker
handleResponseQueueDuration time.Duration
prunePeersDuration time.Duration
}

func newSyncQueue(s *Service) *syncQueue {
ctx, cancel := context.WithCancel(s.ctx)

return &syncQueue{
s: s,
slotDuration: defaultSlotDuration,
ctx: ctx,
cancel: cancel,
peerScore: new(sync.Map),
requestData: new(sync.Map),
requestDataByHash: new(sync.Map),
justificationRequestData: new(sync.Map),
requestCh: make(chan *syncRequest, blockRequestBufferSize),
responses: []*types.BlockData{},
responseCh: make(chan []*types.BlockData, blockResponseBufferSize),
benchmarker: newSyncBenchmarker(),
buf: make([]byte, maxBlockResponseSize),
s: s,
slotDuration: defaultSlotDuration,
ctx: ctx,
cancel: cancel,
peerScore: new(sync.Map),
requestData: new(sync.Map),
requestDataByHash: new(sync.Map),
justificationRequestData: new(sync.Map),
requestCh: make(chan *syncRequest, blockRequestBufferSize),
responses: []*types.BlockData{},
responseCh: make(chan []*types.BlockData, blockResponseBufferSize),
benchmarker: newSyncBenchmarker(),
buf: make([]byte, maxBlockResponseSize),
handleResponseQueueDuration: defaultHandleResponseQueueDuration,
prunePeersDuration: defaultPrunePeersDuration,
}
}

Expand All @@ -176,10 +182,12 @@ func (q *syncQueue) syncAtHead() {
q.s.syncer.SetSyncing(true)
q.s.noGossip = true // don't gossip messages until we're at the head

t := time.NewTicker(q.slotDuration * 2)
defer t.Stop()
for {
select {
// sleep for average block time TODO: make this configurable from slot duration
case <-time.After(q.slotDuration * 2):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down Expand Up @@ -214,9 +222,11 @@ func (q *syncQueue) syncAtHead() {
}

func (q *syncQueue) handleResponseQueue() {
t := time.NewTicker(q.handleResponseQueueDuration)
defer t.Stop()
for {
select {
case <-time.After(time.Second):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down Expand Up @@ -260,9 +270,11 @@ func (q *syncQueue) handleResponseQueue() {

// prune peers with low score and connect to new peers
func (q *syncQueue) prunePeers() {
t := time.NewTicker(q.prunePeersDuration)
defer t.Stop()
for {
select {
case <-time.After(time.Second * 30):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down
18 changes: 12 additions & 6 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ type Message struct {

// Handler struct for holding telemetry related things
type Handler struct {
msg chan Message
connections []*telemetryConnection
log log.Logger
msg chan Message
connections []*telemetryConnection
log log.Logger
sendMessageTimeout time.Duration
}

// KeyValue object to hold key value pairs used in telemetry messages
Expand All @@ -56,14 +57,17 @@ var (
handlerInstance *Handler
)

const defaultMessageTimeout = time.Second

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler { //nolint
if handlerInstance == nil {
once.Do(
func() {
handlerInstance = &Handler{
msg: make(chan Message, 256),
log: log.New("pkg", "telemetry"),
msg: make(chan Message, 256),
log: log.New("pkg", "telemetry"),
sendMessageTimeout: defaultMessageTimeout,
}
go handlerInstance.startListening()
})
Expand Down Expand Up @@ -109,10 +113,12 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {

// SendMessage sends Message to connected telemetry listeners
func (h *Handler) SendMessage(msg *Message) error {
t := time.NewTicker(h.sendMessageTimeout)
defer t.Stop()
select {
case h.msg <- *msg:

case <-time.After(time.Second * 1):
case <-t.C:
return errors.New("timeout sending message")
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,13 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) (
}

func (s *Service) sendNeighbourMessage() {
t := time.NewTicker(neighbourMessageInterval)
defer t.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-time.After(neighbourMessageInterval):
case <-t.C:
if s.neighbourMessage == nil {
continue
}
Expand Down

0 comments on commit a84466d

Please sign in to comment.