Skip to content

Commit

Permalink
Merge branch 'master' into sync
Browse files Browse the repository at this point in the history
  • Loading branch information
nrwiersma committed Jan 8, 2019
2 parents da1e149 + c57007c commit 47631f8
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 23 deletions.
36 changes: 36 additions & 0 deletions kafka/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,42 @@ func TestSource_Consume(t *testing.T) {
assert.Equal(t, []byte("foo"), msg.Value)
}

func TestSource_ConsumeError(t *testing.T) {
broker0 := sarama.NewMockBroker(t, 0)
defer broker0.Close()
broker0.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("test_topic", 0, broker0.BrokerID()),
"FindCoordinatorRequest": sarama.NewMockFindCoordinatorResponse(t).
SetCoordinator(sarama.CoordinatorGroup, "test_group", broker0),
"JoinGroupRequest": sarama.NewMockWrapper(&sarama.JoinGroupResponse{
Version: 1,
Err: sarama.ErrNoError,
GroupProtocol: "protocol",
}),
"SyncGroupRequest": sarama.NewMockWrapper(&sarama.SyncGroupResponse{
Err: sarama.ErrBrokerNotAvailable,
MemberAssignment: []byte{},
}),
"LeaveGroupRequest": sarama.NewMockWrapper(&sarama.LeaveGroupResponse{
Err: sarama.ErrNoError,
}),
})
c := kafka.NewSourceConfig()
c.Brokers = []string{broker0.Addr()}
c.Topic = "test_topic"
c.GroupID = "test_group"
s, _ := kafka.NewSource(c)
defer s.Close()

time.Sleep(500 * time.Millisecond)

_, err := s.Consume()

assert.Error(t, err)
}

func TestSource_Commit(t *testing.T) {
broker0 := sarama.NewMockBroker(t, 0)
defer broker0.Close()
Expand Down
34 changes: 17 additions & 17 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,16 @@ type timedSupervisor struct {
errFn ErrorFunc

t *time.Ticker
resetCh chan struct{}
commits uint32
running uint32
}

// NewTimedSupervisor returns a supervisor that commits automatically.
func NewTimedSupervisor(inner Supervisor, d time.Duration, errFn ErrorFunc) Supervisor {
return &timedSupervisor{
inner: inner,
d: d,
errFn: errFn,
resetCh: make(chan struct{}, 1),
inner: inner,
d: d,
errFn: errFn,
}
}

Expand All @@ -208,17 +207,16 @@ func (s *timedSupervisor) Start() error {
s.t = time.NewTicker(s.d)

go func() {
for {
select {
case <-s.t.C:
err := s.inner.Commit(nil)
if err != nil {
s.errFn(err)
}

case <-s.resetCh:
s.t.Stop()
s.t = time.NewTicker(s.d)
for range s.t.C {
// If there was a commit triggered "manually" by a Committer, skip a single timed commit.
if atomic.LoadUint32(&s.commits) > 0 {
atomic.StoreUint32(&s.commits, 0)
continue
}

err := s.inner.Commit(nil)
if err != nil {
s.errFn(err)
}
}
}()
Expand All @@ -245,11 +243,13 @@ func (s *timedSupervisor) Commit(caller Processor) error {
return ErrNotRunning
}

// Increment the commit count
atomic.AddUint32(&s.commits, 1)

err := s.inner.Commit(caller)
if err != nil {
return err
}
s.resetCh <- struct{}{}

return nil
}
Expand Down
10 changes: 5 additions & 5 deletions supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,22 +381,22 @@ func TestTimedSupervisor_Commit(t *testing.T) {
inner.AssertCalled(t, "Commit", caller)
}

func TestTimedSupervisor_CommitResetsTimer(t *testing.T) {
func TestTimedSupervisor_ManualCommitSkipsTimedCommit(t *testing.T) {
caller := new(MockProcessor)
inner := new(MockSupervisor)
inner.On("Start").Return(nil)
inner.On("Commit", mock.Anything).Return(nil)
inner.On("Commit", caller).Return(nil)
inner.On("Close").Return(nil)

supervisor := streams.NewTimedSupervisor(inner, 10*time.Millisecond, nil)
supervisor := streams.NewTimedSupervisor(inner, 5*time.Millisecond, nil)
_ = supervisor.Start()
defer supervisor.Close()

time.Sleep(5 * time.Millisecond)
time.Sleep(2 * time.Millisecond)

_ = supervisor.Commit(caller)

time.Sleep(5 * time.Millisecond)
time.Sleep(4 * time.Millisecond)

inner.AssertNumberOfCalls(t, "Commit", 1)
}
Expand Down
3 changes: 2 additions & 1 deletion task.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task {
topology: topology,
mode: Async,
store: store,
errorFn: func(_ error) {},
supervisorOpts: supervisorOpts{
Strategy: Lossless,
Interval: 0,
Expand All @@ -92,7 +93,7 @@ func NewTask(topology *Topology, opts ...TaskOptFunc) Task {

t.supervisor = NewSupervisor(t.store, t.supervisorOpts.Strategy)
if t.supervisorOpts.Interval > 0 {
t.supervisor = NewTimedSupervisor(t.supervisor, t.supervisorOpts.Interval, t.errorFn)
t.supervisor = NewTimedSupervisor(t.supervisor, t.supervisorOpts.Interval, t.handleError)
}

return t
Expand Down

0 comments on commit 47631f8

Please sign in to comment.