Skip to content

Commit

Permalink
bugfix partition state notification
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Jun 14, 2020
1 parent f47ae9c commit dc77c63
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
6 changes: 3 additions & 3 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ WaitLoop:
case <-ticker.C:
p.log.Printf("creating storage for topic %s/%d for %.1f minutes ...", p.topic, p.partition, time.Since(start).Minutes())
case <-done:
p.log.Printf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes())
p.log.Debugf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes())
if err != nil {
return nil, fmt.Errorf("error building storage: %v", err)
}
Expand Down Expand Up @@ -282,6 +282,8 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
return
}()

p.state.SetState(State(PartitionConnecting))

// fetch local offset
storedOffset, err = p.st.GetOffset(offsetNotStored)
if err != nil {
Expand Down Expand Up @@ -333,8 +335,6 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr

defer p.log.Debugf("... Loading done")

p.state.SetState(State(PartitionConnecting))

partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset)
if err != nil {
errs.Collect(fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err))
Expand Down
12 changes: 6 additions & 6 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ func (g *Processor) assignmentFromSession(session sarama.ConsumerGroupSession) (
func (g *Processor) Setup(session sarama.ConsumerGroupSession) error {
g.state.SetState(ProcStateSetup)
defer g.state.SetState(ProcStateRunning)
g.log.Printf("setup generation %d, claims=%#v", session.GenerationID(), session.Claims())
defer g.log.Printf("setup generation %d ... done", session.GenerationID())
g.log.Debugf("setup generation %d, claims=%#v", session.GenerationID(), session.Claims())
defer g.log.Debugf("setup generation %d ... done", session.GenerationID())

assignment, err := g.assignmentFromSession(session)
if err != nil {
Expand Down Expand Up @@ -479,8 +479,8 @@ func (g *Processor) Setup(session sarama.ConsumerGroupSession) error {
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error {
g.log.Printf("Cleaning up for %d", session.GenerationID())
defer g.log.Printf("Cleaning up for %d ... done", session.GenerationID())
g.log.Debugf("Cleaning up for %d", session.GenerationID())
defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID())

g.state.SetState(ProcStateStopping)
defer g.state.SetState(ProcStateIdle)
Expand Down Expand Up @@ -525,8 +525,8 @@ func (g *Processor) WaitForReady() {
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
g.log.Printf("ConsumeClaim for topic/partition %s/%d, initialOffset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset())
defer g.log.Printf("ConsumeClaim done for topic/partition %s/%d", claim.Topic(), claim.Partition())
g.log.Debugf("ConsumeClaim for topic/partition %s/%d, initialOffset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset())
defer g.log.Debugf("ConsumeClaim done for topic/partition %s/%d", claim.Topic(), claim.Partition())
part, has := g.partitions[claim.Partition()]
if !has {
return fmt.Errorf("No partition (%d) to handle input in topic %s", claim.Partition(), claim.Topic())
Expand Down
18 changes: 14 additions & 4 deletions signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type waiter struct {
// Signal allows synchronization on a state, waiting for that state and checking
// the current state
type Signal struct {
m sync.Mutex
m sync.RWMutex
state State
waiters []*waiter
stateChangeObservers []*StateChangeObserver
Expand Down Expand Up @@ -45,8 +45,14 @@ func (s *Signal) SetState(state State) *Signal {
panic(fmt.Errorf("trying to set illegal state %v", state))
}

// if we're already in the state, do not notify anyone
if s.state == state {
return s
}

// set the state and notify all channels waiting for it.
s.state = state

var newWaiters []*waiter
for _, w := range s.waiters {
if w.state == state || (w.minState && state >= w.state) {
Expand All @@ -67,11 +73,15 @@ func (s *Signal) SetState(state State) *Signal {

// IsState returns if the signal is in the requested state
func (s *Signal) IsState(state State) bool {
s.m.RLock()
defer s.m.RUnlock()
return s.state == state
}

// State returns the current state
func (s *Signal) State() State {
s.m.RLock()
defer s.m.RUnlock()
return s.state
}

Expand All @@ -94,7 +104,7 @@ func (s *Signal) WaitForStateMin(state State) chan struct{} {
// state.
func (s *Signal) WaitForState(state State) chan struct{} {
s.m.Lock()
defer s.m.Unlock()
s.m.Unlock()

w := &waiter{
done: make(chan struct{}),
Expand All @@ -108,7 +118,7 @@ func (s *Signal) waitForWaiter(state State, w *waiter) chan struct{} {

// if the signal is currently in that state (or in a higher state if minState is set)
// then close the waiter immediately
if curState := s.State(); state == curState || (w.minState && curState >= state) {
if curState := s.state; state == curState || (w.minState && curState >= state) {
close(w.done)
} else {
s.waiters = append(s.waiters, w)
Expand Down Expand Up @@ -157,7 +167,7 @@ func (s *Signal) ObserveStateChange() *StateChangeObserver {
}

// initialize the observer with the current state
observer.notify(s.State())
observer.notify(s.state)

// the stop funtion stops the observer by closing its channel
// and removing it from the list of observers
Expand Down

0 comments on commit dc77c63

Please sign in to comment.