Skip to content

Commit

Permalink
Merge fb7eeaa into d564019
Browse files Browse the repository at this point in the history
  • Loading branch information
Adib Rastegarnia committed Aug 19, 2020
2 parents d564019 + fb7eeaa commit 5384d34
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 81 deletions.
12 changes: 7 additions & 5 deletions pkg/controller/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
package controller

import (
"regexp"

"github.com/onosproject/onos-config/api/types"
mastershipstore "github.com/onosproject/onos-config/pkg/store/mastership"
topodevice "github.com/onosproject/onos-topo/api/device"
"regexp"
)

// Filter filters individual events for a node
Expand All @@ -43,11 +44,12 @@ func (f *MastershipFilter) Accept(id types.ID) bool {
if err != nil {
return false
}
master, err := f.Store.IsMaster(device)
if err != nil {
return false
master, err := f.Store.GetMastership(device)
if err == nil && master != nil {
return true
}
return master

return false
}

var _ Filter = &MastershipFilter{}
Expand Down
21 changes: 11 additions & 10 deletions pkg/controller/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
package controller

import (
"testing"
"time"

"github.com/onosproject/onos-config/api/types"
"github.com/onosproject/onos-config/pkg/store/cluster"
"github.com/onosproject/onos-config/pkg/store/mastership"
topodevice "github.com/onosproject/onos-topo/api/device"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

type testDeviceResolver struct {
Expand Down Expand Up @@ -54,21 +55,21 @@ func TestMastershipFilter(t *testing.T) {
device1 := topodevice.ID("device1")
device2 := topodevice.ID("device2")

master, err := store1.IsMaster(device1)
master, err := store1.GetMastership(device1)
assert.NoError(t, err)
assert.True(t, master)
assert.NotNil(t, master)

master, err = store2.IsMaster(device1)
master, err = store2.GetMastership(device1)
assert.NoError(t, err)
assert.False(t, master)
assert.Nil(t, master)

master, err = store2.IsMaster(device2)
master, err = store2.GetMastership(device2)
assert.NoError(t, err)
assert.True(t, master)
assert.NotNil(t, master)

master, err = store1.IsMaster(device2)
master, err = store1.GetMastership(device2)
assert.NoError(t, err)
assert.False(t, master)
assert.Nil(t, master)

assert.True(t, filter1.Accept(types.ID(device1)))
assert.False(t, filter2.Accept(types.ID(device1)))
Expand Down
17 changes: 9 additions & 8 deletions pkg/store/mastership/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/atomix/go-client/pkg/client"
"github.com/atomix/go-client/pkg/client/election"
"github.com/atomix/go-client/pkg/client/primitive"
"github.com/atomix/go-client/pkg/client/util/net"
"github.com/onosproject/onos-config/pkg/store/cluster"
topodevice "github.com/onosproject/onos-topo/api/device"
"io"
"sync"
"time"
)

// newAtomixElection returns a new persistent device mastership election
Expand Down Expand Up @@ -82,8 +83,8 @@ type deviceMastershipElection interface {
// DeviceID returns the device for which this election provides mastership
DeviceID() topodevice.ID

// isMaster returns a bool indicating whether the local node is the master for the device
isMaster() (bool, error)
// getMastership returns the mastership info
getMastership() *Mastership

// watch watches the election for changes
watch(ch chan<- Mastership) error
Expand Down Expand Up @@ -168,13 +169,13 @@ func (e *atomixDeviceMastershipElection) watchElection(ch <-chan *election.Event
}
}

func (e *atomixDeviceMastershipElection) isMaster() (bool, error) {
func (e *atomixDeviceMastershipElection) getMastership() *Mastership {
e.mu.RLock()
defer e.mu.RUnlock()
if e.mastership == nil || string(e.mastership.Master) != e.election.ID() {
return false, nil
return nil
}
return true, nil
return e.mastership
}

func (e *atomixDeviceMastershipElection) watch(ch chan<- Mastership) error {
Expand Down
32 changes: 14 additions & 18 deletions pkg/store/mastership/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package mastership

import (
"testing"

"github.com/onosproject/onos-config/pkg/store/cluster"
"github.com/onosproject/onos-lib-go/pkg/atomix"
topodevice "github.com/onosproject/onos-topo/api/device"
"github.com/stretchr/testify/assert"
"testing"
)

func TestMastershipElection(t *testing.T) {
Expand All @@ -42,44 +43,39 @@ func TestMastershipElection(t *testing.T) {
err = store3.watch(store3Ch)
assert.NoError(t, err)

master, err := store1.isMaster()
assert.NoError(t, err)
assert.True(t, master)
master := store1.getMastership()
assert.NotNil(t, master)

master, err = store2.isMaster()
assert.NoError(t, err)
assert.False(t, master)
master = store2.getMastership()
assert.Nil(t, master)

master, err = store3.isMaster()
assert.NoError(t, err)
assert.False(t, master)
master = store3.getMastership()
assert.Nil(t, master)

err = store1.Close()
assert.NoError(t, err)

mastership := <-store2Ch
assert.Equal(t, cluster.NodeID("b"), mastership.Master)

master, err = store2.isMaster()
assert.NoError(t, err)
assert.True(t, master)
master = store2.getMastership()
assert.NotNil(t, master)

mastership = <-store3Ch
assert.Equal(t, cluster.NodeID("b"), mastership.Master)

master, err = store3.isMaster()
assert.NoError(t, err)
assert.False(t, master)
master = store3.getMastership()
assert.Nil(t, master)

err = store2.Close()
assert.NoError(t, err)

mastership = <-store3Ch
assert.Equal(t, cluster.NodeID("c"), mastership.Master)

master, err = store3.isMaster()
master = store3.getMastership()
assert.NoError(t, err)
assert.True(t, master)
assert.NotNil(t, master)

_ = store3.Close()
}
16 changes: 9 additions & 7 deletions pkg/store/mastership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
package mastership

import (
"io"
"sync"

"github.com/atomix/go-client/pkg/client/util/net"
"github.com/onosproject/onos-config/pkg/config"
"github.com/onosproject/onos-config/pkg/store/cluster"
"github.com/onosproject/onos-lib-go/pkg/atomix"
topodevice "github.com/onosproject/onos-topo/api/device"
"io"
"sync"
)

// Term is a monotonically increasing mastership term
Expand All @@ -34,8 +35,8 @@ type Store interface {
// NodeID returns the local node identifier used in mastership elections
NodeID() cluster.NodeID

// IsMaster returns a boolean indicating whether the local node is the master for the given device
IsMaster(id topodevice.ID) (bool, error)
// GetMastership returns the mastership for a given device
GetMastership(id topodevice.ID) (*Mastership, error)

// Watch watches the store for mastership changes
Watch(topodevice.ID, chan<- Mastership) error
Expand Down Expand Up @@ -126,12 +127,13 @@ func (s *atomixStore) NodeID() cluster.NodeID {
return s.nodeID
}

func (s *atomixStore) IsMaster(deviceID topodevice.ID) (bool, error) {
func (s *atomixStore) GetMastership(deviceID topodevice.ID) (*Mastership, error) {
election, err := s.getElection(deviceID)
if err != nil {
return false, err
return nil, err
}
return election.isMaster()

return election.getMastership(), nil
}

func (s *atomixStore) Watch(deviceID topodevice.ID, ch chan<- Mastership) error {
Expand Down
64 changes: 41 additions & 23 deletions pkg/store/mastership/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
package mastership

import (
"testing"

"github.com/onosproject/onos-config/pkg/store/cluster"
"github.com/onosproject/onos-lib-go/pkg/atomix"
topodevice "github.com/onosproject/onos-topo/api/device"
"github.com/stretchr/testify/assert"
"testing"
)

func TestMastershipStore(t *testing.T) {
Expand All @@ -44,44 +45,51 @@ func TestMastershipStore(t *testing.T) {

// Verify that the first node that checks mastership for a device wins the election
// and no other node believes itself to be the master
master, err := store1.IsMaster(device1)
master, err := store1.GetMastership(device1)
assert.NoError(t, err)
assert.True(t, master)
assert.NotNil(t, master)
// Verify the master term for the given device is correct
// Since the election is performed once for device1 then the term will be 1
assert.Equal(t, master.Term, Term(1))

master, err = store2.IsMaster(device1)
master, err = store2.GetMastership(device1)
assert.NoError(t, err)
assert.False(t, master)
assert.Nil(t, master)

master, err = store3.IsMaster(device1)
master, err = store3.GetMastership(device1)
assert.NoError(t, err)
assert.False(t, master)
assert.Nil(t, master)

// Verify that listening for events for a device enters a node into the device mastership election
store2Ch2 := make(chan Mastership)
err = store2.Watch(device2, store2Ch2)
assert.NoError(t, err)

// Verify that the watching node is the master
master, err = store2.IsMaster(device2)
master, err = store2.GetMastership(device2)
assert.NoError(t, err)
assert.True(t, master)
assert.NotNil(t, master)

master, err = store1.IsMaster(device2)
// Verify the master term for the given device is correct
// Since the election is performed once for device2 then the term will be 1
assert.Equal(t, master.Term, Term(1))

master, err = store1.GetMastership(device2)
assert.NoError(t, err)
assert.False(t, master)
assert.Nil(t, master)

master, err = store3.IsMaster(device2)
master, err = store3.GetMastership(device2)
assert.NoError(t, err)
assert.False(t, master)
assert.Nil(t, master)

// Watch device2 mastership on an additional node and verify that it does not cause a mastership change
store3Ch2 := make(chan Mastership)
err = store3.Watch(device2, store3Ch2)
assert.NoError(t, err)

master, err = store3.IsMaster(device1)
master, err = store3.GetMastership(device1)
assert.NoError(t, err)
assert.False(t, master)
assert.Nil(t, master)

// Listen for device1 events on remaining nodes
store2Ch1 := make(chan Mastership)
Expand All @@ -100,18 +108,22 @@ func TestMastershipStore(t *testing.T) {
assert.Equal(t, device1, mastership.Device)
assert.Equal(t, node2, mastership.Master)

master, err = store2.IsMaster(device1)
master, err = store2.GetMastership(device1)
assert.NoError(t, err)
assert.True(t, master)
assert.NotNil(t, master)

// Verify the master term for the given device is correct
// Since the master of device1 is changed then its term has been increased by 1
assert.Equal(t, master.Term, Term(2))

// node3 should not be the master for device1
mastership = <-store3Ch1
assert.Equal(t, device1, mastership.Device)
assert.Equal(t, node2, mastership.Master)

master, err = store3.IsMaster(device1)
master, err = store3.GetMastership(device1)
assert.NoError(t, err)
assert.False(t, master)
assert.Nil(t, master)

// Close node2 and verify mastership for both devices changes
err = store2.Close()
Expand All @@ -122,18 +134,24 @@ func TestMastershipStore(t *testing.T) {
assert.Equal(t, device1, mastership.Device)
assert.Equal(t, node3, mastership.Master)

master, err = store3.IsMaster(device1)
master, err = store3.GetMastership(device1)
assert.NoError(t, err)
assert.True(t, master)
assert.NotNil(t, master)

// Verify the master term for the given device is correct
// Since master of device1 has been changed its term has been increased by 1
assert.Equal(t, master.Term, Term(3))

// node3 should also be the master for device2
mastership = <-store3Ch2
assert.Equal(t, device2, mastership.Device)
assert.Equal(t, node3, mastership.Master)

master, err = store3.IsMaster(device2)
master, err = store3.GetMastership(device2)
assert.NoError(t, err)
assert.True(t, master)
assert.NotNil(t, master)
// Since master of device2 has been changed its term has been increased by 1
assert.Equal(t, master.Term, Term(2))

_ = store3.Close()
}
Loading

0 comments on commit 5384d34

Please sign in to comment.