Skip to content

Commit

Permalink
Implemented two new Events in ringpop for which users can listen to h…
Browse files Browse the repository at this point in the history
…appen.

1. Ready: This event is fired when ringpop changes its state to ready and can be queried via Lookup/LookupN/WhoAmI
2. Destroy: This event signals that ringpop has been destroyed and should not be used anymore.
  • Loading branch information
Nils Dijk committed Mar 11, 2016
1 parent 9054c26 commit b7cfee4
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 1 deletion.
6 changes: 6 additions & 0 deletions events/events.go
Expand Up @@ -49,3 +49,9 @@ type LookupEvent struct {
Key string
Duration time.Duration
}

// Ready is fired when ringpop has successfully bootstrapped and is ready to receive requests and other method calls.
type Ready struct{}

// Destroyed is fired when ringpop has been destroyed and should not be responding to requests or lookup requests.
type Destroyed struct{}
14 changes: 13 additions & 1 deletion ringpop.go
Expand Up @@ -314,11 +314,23 @@ func (rp *Ringpop) getState() state {
return r
}

// setState sets the state of the current Ringpop instance.
// setState sets the state of the current Ringpop instance. It will emit an appropriate
// event when the state will actually change
func (rp *Ringpop) setState(s state) {
rp.stateMutex.Lock()
oldState := rp.state
rp.state = s
rp.stateMutex.Unlock()

// test if the state has changed with this call to setState
if oldState != s {
switch s {
case ready:
rp.emit(events.Ready{})
case destroyed:
rp.emit(events.Destroyed{})
}
}
}

//= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
Expand Down
134 changes: 134 additions & 0 deletions ringpop_test.go
Expand Up @@ -29,19 +29,52 @@ import (
"github.com/stretchr/testify/suite"
"github.com/uber/ringpop-go/discovery/statichosts"
"github.com/uber/ringpop-go/events"
eventsmocks "github.com/uber/ringpop-go/events/test/mocks"
"github.com/uber/ringpop-go/forward"
"github.com/uber/ringpop-go/swim"
"github.com/uber/ringpop-go/test/mocks"
"github.com/uber/tchannel-go"
)

type destroyable interface {
Destroy()
}

type destroyableChannel struct {
*tchannel.Channel
}

func (c *destroyableChannel) Destroy() {
if c.Channel != nil {
c.Channel.Close()
}
}

type RingpopTestSuite struct {
suite.Suite
mockClock *clock.Mock
ringpop *Ringpop
channel *tchannel.Channel
mockRingpop *mocks.Ringpop
mockSwimNode *mocks.SwimNode

destroyables []destroyable
}

func (s *RingpopTestSuite) makeNewRingpop() (rp *Ringpop, err error) {
ch, err := tchannel.NewChannel("test", nil)
s.NoError(err, "channel must create successfully")

err = ch.ListenAndServe("127.0.0.1:0")
s.NoError(err, "channel must listen successfully")

rp, err = New("test", Channel(ch), Clock(s.mockClock))
s.NoError(err, "Ringpop must create successfully")

// collect ringpop and tchannel for destruction later
s.destroyables = append(s.destroyables, &destroyableChannel{ch}, rp)

return
}

// createSingleNodeCluster is a helper function to create a single-node cluster
Expand Down Expand Up @@ -72,8 +105,18 @@ func (s *RingpopTestSuite) SetupTest() {
}

func (s *RingpopTestSuite) TearDownTest() {
// remove listeners during teardown
s.ringpop.listeners = nil

s.channel.Close()
s.ringpop.Destroy()

// clean up all the things
for _, d := range s.destroyables {
if d != nil {
d.Destroy()
}
}
}

func (s *RingpopTestSuite) TestCanAssignRingpopToRingpopInterface() {
Expand Down Expand Up @@ -566,6 +609,97 @@ func (s *RingpopTestSuite) TestStartTimersIdempotance() {
s.ringpop.stopTimers()
}

func (s *RingpopTestSuite) TestReadyEvent() {
called := make(chan bool, 1)

l := &eventsmocks.EventListener{}
l.On("HandleEvent", events.Ready{}).Return().Once().Run(func(args mock.Arguments) {
called <- true
})
s.ringpop.RegisterListener(l)

s.ringpop.setState(ready)

// block with timeout for event to be emitted
select {
case <-called:
case <-time.After(100 * time.Millisecond):
}

l.AssertCalled(s.T(), "HandleEvent", events.Ready{})

s.ringpop.setState(ready)
l.AssertNumberOfCalls(s.T(), "HandleEvent", 1)
}

func (s *RingpopTestSuite) TestDestroyedEvent() {
called := make(chan bool, 1)

l := &eventsmocks.EventListener{}
l.On("HandleEvent", events.Destroyed{}).Return().Once().Run(func(args mock.Arguments) {
called <- true
})
s.ringpop.RegisterListener(l)

s.ringpop.setState(destroyed)

// block with timeout for event to be emitted
select {
case <-called:
case <-time.After(100 * time.Millisecond):
}

l.AssertCalled(s.T(), "HandleEvent", events.Destroyed{})

s.ringpop.setState(destroyed)
l.AssertNumberOfCalls(s.T(), "HandleEvent", 1)
}

func (s *RingpopTestSuite) TestRingIsConstructedWhenStateReady() {
called := make(chan bool, 1)

rp1, err := s.makeNewRingpop()
s.Require().NoError(err)

rp2, err := s.makeNewRingpop()
s.Require().NoError(err)

err = createSingleNodeCluster(rp1)
s.Require().NoError(err)

me1, err := rp1.WhoAmI()
s.Require().NoError(err)

l := &eventsmocks.EventListener{}

l.On("HandleEvent", events.Ready{}).Return().Run(func(args mock.Arguments) {
s.True(rp2.Ready(), "expect ringpop to be ready when the ready event fires")

me2, err := rp2.WhoAmI()
s.Require().NoError(err)

s.True(rp2.ring.HasServer(me1), "expected ringpop1 to be in the ring when ringpop fires the ready event")
s.True(rp2.ring.HasServer(me2), "expected ringpop2 to be in the ring when ringpop fires the ready event")
s.False(rp2.ring.HasServer("127.0.0.1:3001"), "didn't expect the mocked ringpop to be in the ring")

called <- true
})
l.On("HandleEvent", mock.Anything).Return()

rp2.RegisterListener(l)

_, err = rp2.Bootstrap(&swim.BootstrapOptions{
DiscoverProvider: statichosts.New(me1),
})
s.Require().NoError(err)

// block with timeout for event to be emitted
select {
case <-called:
case <-time.After(100 * time.Millisecond):
}
}

func (s *RingpopTestSuite) TestRingChecksumEmitTimer() {
s.ringpop.init()
stats := newDummyStats()
Expand Down

0 comments on commit b7cfee4

Please sign in to comment.