Skip to content

Commit

Permalink
cloudcommon: elect: notifyOne on subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
yousong committed May 11, 2020
1 parent 5155600 commit d845700
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions pkg/cloudcommon/elect/elect.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Elect struct {
config *EtcdConfig

mutex *sync.Mutex
latestEv electEvent
subscribers []chan electEvent
}

Expand Down Expand Up @@ -104,10 +105,12 @@ func NewElect(config *EtcdConfig, key string) (*Elect, error) {
return nil, errors.Wrap(err, "new etcd client")
}
elect := &Elect{
cli: cli,
path: config.LockPrefix + "/" + key,
ttl: config.LockTTL,
mutex: &sync.Mutex{},
cli: cli,
path: config.LockPrefix + "/" + key,
ttl: config.LockTTL,

mutex: &sync.Mutex{},
latestEv: electEventInit,
}
return elect, nil
}
Expand Down Expand Up @@ -173,31 +176,43 @@ func (elect *Elect) do(ctx context.Context) (*ticket, error) {
return r, err
}

func (elect *Elect) subscribe(ch chan electEvent) {
func (elect *Elect) subscribe(ctx context.Context, ch chan electEvent) {
elect.mutex.Lock()
defer elect.mutex.Unlock()
elect.subscribers = append(elect.subscribers, ch)
if ev := elect.latestEv; ev != electEventInit {
elect.notifyOne(ctx, ev, ch)
}
}

func (elect *Elect) notifyOne(ctx context.Context, ev electEvent, ch chan electEvent) {
sent := false
select {
case ch <- ev:
sent = true
case <-ctx.Done():
default:
}
if !sent {
log.Errorf("elect event '%s' missed by %#v", ev, ch)
}
}

func (elect *Elect) notify(ctx context.Context, ev electEvent) {
elect.mutex.Lock()
defer elect.mutex.Unlock()

elect.latestEv = ev
for _, ch := range elect.subscribers {
select {
case ch <- ev:
case <-ctx.Done():
return
default:
}
elect.notifyOne(ctx, ev, ch)
}
}

func (elect *Elect) SubscribeWithAction(ctx context.Context, onWin, onLost func()) {
go func() {
ch := make(chan electEvent, 3)
var ev electEvent
elect.subscribe(ch)
elect.subscribe(ctx, ch)
for {
select {
case ev = <-ch:
Expand Down Expand Up @@ -229,6 +244,7 @@ type electEvent int
const (
electEventWin electEvent = iota
electEventLost
electEventInit
)

func (ev electEvent) String() string {
Expand All @@ -237,6 +253,8 @@ func (ev electEvent) String() string {
return "win"
case electEventLost:
return "lost"
case electEventInit:
return "init"
default:
return "unexpected"
}
Expand Down

0 comments on commit d845700

Please sign in to comment.