Skip to content

Commit

Permalink
Fix data race on partition recvChan
Browse files Browse the repository at this point in the history
  • Loading branch information
tylertreat committed Sep 15, 2020
1 parent 6c457e3 commit e80f556
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions server/partition.go
Expand Up @@ -69,7 +69,6 @@ type partition struct {
sub *nats.Subscription // Subscription to partition NATS subject
leaderReplSub *nats.Subscription // Subscription for replication requests from followers
leaderOffsetSub *nats.Subscription // Subscription for leader epoch offset requests from followers
recvChan chan *nats.Msg // Channel leader places received messages on
log commitlog.CommitLog
srv *Server
isLeading bool
Expand Down Expand Up @@ -393,10 +392,10 @@ func (p *partition) becomeLeader(epoch uint64) error {
}

// Start message processing loop.
p.recvChan = make(chan *nats.Msg, recvChannelSize)
recvChan := make(chan *nats.Msg, recvChannelSize)
p.stopLeader = make(chan struct{})
p.srv.startGoroutine(func() {
p.messageProcessingLoop(p.recvChan, p.stopLeader, epoch)
p.messageProcessingLoop(recvChan, p.stopLeader, epoch)
p.shutdown.Done()
})

Expand All @@ -406,7 +405,7 @@ func (p *partition) becomeLeader(epoch uint64) error {
// Subscribe to the NATS subject and begin sequencing messages.
// TODO: This should be drained on shutdown.
sub, err := p.srv.nc.QueueSubscribe(p.getSubject(), p.Group, func(m *nats.Msg) {
p.recvChan <- m
recvChan <- m
})
if err != nil {
return errors.Wrap(err, "failed to subscribe to NATS")
Expand Down Expand Up @@ -481,7 +480,6 @@ func (p *partition) stopLeading() error {

p.commitQueue.Dispose()
p.isLeading = false
p.recvChan = nil // Nil this out since it's a non-trivial amount of memory

return nil
}
Expand Down

0 comments on commit e80f556

Please sign in to comment.