Skip to content

Commit

Permalink
Fix get current term
Browse files Browse the repository at this point in the history
  • Loading branch information
adibrastegarnia committed Sep 1, 2020
1 parent 1bf23e1 commit 37887ee
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 16 deletions.
20 changes: 12 additions & 8 deletions pkg/southbound/synchronizer/deviceUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package synchronizer

import (
"strconv"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -25,10 +27,10 @@ import (
topodevice "github.com/onosproject/onos-topo/api/device"
)

func (s *Session) updateDevice(connectivity topodevice.ConnectivityState, channel topodevice.ChannelState,
func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState,
service topodevice.ServiceState) error {
log.Info("Update device state")
id := s.device.ID

topoDevice, err := s.deviceStore.Get(id)

st, ok := status.FromError(err)
Expand All @@ -48,9 +50,7 @@ func (s *Session) updateDevice(connectivity topodevice.ConnectivityState, channe
} else {
protocolState = new(topodevice.ProtocolState)
}
if protocolState.ConnectivityState == connectivity && protocolState.ChannelState == channel && protocolState.ServiceState == service {
return nil
}

protocolState.Protocol = topodevice.Protocol_GNMI
protocolState.ConnectivityState = connectivity
protocolState.ChannelState = channel
Expand All @@ -60,10 +60,12 @@ func (s *Session) updateDevice(connectivity topodevice.ConnectivityState, channe
if err != nil {
return err
}

if topoDevice.Attributes == nil {
topoDevice.Attributes = make(map[string]string)
}
topoDevice.Attributes[mastershipTermKey] = string(mastershipState.Term)

topoDevice.Attributes[mastershipTermKey] = strconv.FormatUint(uint64(mastershipState.Term), 10)
_, err = s.deviceStore.Update(topoDevice)
if err != nil {
log.Errorf("Device %s is not updated %s", id, err.Error())
Expand Down Expand Up @@ -93,15 +95,17 @@ func (s *Session) updateDeviceState() error {
switch event.EventType() {
case events.EventTypeDeviceConnected:
log.Info("Device connected")
id := topodevice.ID(event.Subject())
// TODO: Retry only on write conflicts
return backoff.Retry(func() error {
return s.updateDevice(topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED,
return s.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED,
topodevice.ServiceState_AVAILABLE)
}, backoff.NewExponentialBackOff())
case events.EventTypeErrorDeviceConnect:
id := topodevice.ID(event.Subject())
// TODO: Retry only on write conflicts
return backoff.Retry(func() error {
return s.updateDevice(topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED,
return s.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED,
topodevice.ServiceState_UNAVAILABLE)
}, backoff.NewExponentialBackOff())

Expand Down
27 changes: 20 additions & 7 deletions pkg/southbound/synchronizer/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ type Session struct {
mu sync.RWMutex
}

func (s *Session) getCurrentTerm() (int, error) {

device, err := s.deviceStore.Get(s.device.ID)
if err != nil {
return 0, err
}

term := device.Attributes[mastershipTermKey]
return strconv.Atoi(term)
}

// open open a new gNMI session
func (s *Session) open() error {
log.Info("Opening a gNMI session")
Expand All @@ -87,11 +98,10 @@ func (s *Session) open() error {
s.mu.Lock()
s.mastershipState = state
s.mu.Unlock()
var currentTerm int
if s.device.Attributes == nil {
currentTerm = 0
} else {
currentTerm, _ = strconv.Atoi(s.device.Attributes[mastershipTermKey])

currentTerm, err := s.getCurrentTerm()
if err != nil {
log.Error(err)
}

if state.Master == s.mastershipStore.NodeID() && uint64(state.Term) >= uint64(currentTerm) {
Expand All @@ -112,9 +122,12 @@ func (s *Session) open() error {
s.mu.Lock()
s.mastershipState = &state
s.mu.Unlock()
currentTerm, _ := strconv.Atoi(s.device.Attributes[mastershipTermKey])
currentTerm, err := s.getCurrentTerm()
if err != nil {
log.Error(err)
}
if state.Master == s.mastershipStore.NodeID() && !connected && uint64(state.Term) >= uint64(currentTerm) {
log.Info("Election changed", s.mastershipStore.NodeID())
log.Info("Election changed", s.mastershipStore.NodeID(), currentTerm, state.Term)
err := s.connect()
if err != nil {
log.Error(err)
Expand Down
3 changes: 3 additions & 0 deletions pkg/southbound/synchronizer/sessionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ func (sm *SessionManager) createSession(device *topodevice.Device) error {
mastershipStore: sm.mastershipStore,
deviceStore: sm.deviceStore,
}
if session.device.Attributes == nil {
session.device.Attributes = make(map[string]string)
}

err := session.open()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion test/gnmi/subscribetest.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *TestSuite) TestSubscribeOnce(t *testing.T) {
err = subC.Subscribe(gnmi.MakeContext(), *q, "gnmi")
defer subC.Close()
assert.NoError(t, err)
gnmi.DeleteSimulator(t, simulator)
//gnmi.DeleteSimulator(t, simulator)
}

// TestSubscribe tests a stream subscription to updates to a device
Expand Down

0 comments on commit 37887ee

Please sign in to comment.