Skip to content

Commit

Permalink
checking term for updating the device state
Browse files Browse the repository at this point in the history
  • Loading branch information
adibrastegarnia committed Sep 1, 2020
1 parent b0865b8 commit 1bf23e1
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 11 deletions.
20 changes: 14 additions & 6 deletions pkg/southbound/synchronizer/deviceUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (

"github.com/onosproject/onos-config/pkg/events"

devicestore "github.com/onosproject/onos-config/pkg/store/device"
topodevice "github.com/onosproject/onos-topo/api/device"
)

func updateDevice(deviceStore devicestore.Store, id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState,
func (s *Session) updateDevice(connectivity topodevice.ConnectivityState, channel topodevice.ChannelState,
service topodevice.ServiceState) error {
log.Info("Update device state")
topoDevice, err := deviceStore.Get(id)
id := s.device.ID
topoDevice, err := s.deviceStore.Get(id)

st, ok := status.FromError(err)

Expand All @@ -56,7 +56,15 @@ func updateDevice(deviceStore devicestore.Store, id topodevice.ID, connectivity
protocolState.ChannelState = channel
protocolState.ServiceState = service
topoDevice.Protocols = append(topoDevice.Protocols, protocolState)
_, err = deviceStore.Update(topoDevice)
mastershipState, err := s.mastershipStore.GetMastership(topoDevice.ID)
if err != nil {
return err
}
if topoDevice.Attributes == nil {
topoDevice.Attributes = make(map[string]string)
}
topoDevice.Attributes[mastershipTermKey] = string(mastershipState.Term)
_, err = s.deviceStore.Update(topoDevice)
if err != nil {
log.Errorf("Device %s is not updated %s", id, err.Error())
return err
Expand Down Expand Up @@ -87,13 +95,13 @@ func (s *Session) updateDeviceState() error {
log.Info("Device connected")
// TODO: Retry only on write conflicts
return backoff.Retry(func() error {
return updateDevice(s.deviceStore, s.device.ID, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED,
return s.updateDevice(topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED,
topodevice.ServiceState_AVAILABLE)
}, backoff.NewExponentialBackOff())
case events.EventTypeErrorDeviceConnect:
// TODO: Retry only on write conflicts
return backoff.Retry(func() error {
return updateDevice(s.deviceStore, s.device.ID, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED,
return s.updateDevice(topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED,
topodevice.ServiceState_UNAVAILABLE)
}, backoff.NewExponentialBackOff())

Expand Down
15 changes: 15 additions & 0 deletions pkg/southbound/synchronizer/deviceUpdate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2020-present Open Networking Foundation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package synchronizer
20 changes: 15 additions & 5 deletions pkg/southbound/synchronizer/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package synchronizer

import (
"context"
"strconv"
"sync"
"time"

Expand All @@ -38,8 +39,9 @@ import (
)

const (
backoffInterval = 10 * time.Millisecond
maxBackoffTime = 5 * time.Second
backoffInterval = 10 * time.Millisecond
maxBackoffTime = 5 * time.Second
mastershipTermKey = "onos-config.mastership.term"
)

// Session a gNMI session
Expand Down Expand Up @@ -85,8 +87,15 @@ func (s *Session) open() error {
s.mu.Lock()
s.mastershipState = state
s.mu.Unlock()
if state.Master == s.mastershipStore.NodeID() {
log.Info("Master node", s.mastershipStore.NodeID())
var currentTerm int
if s.device.Attributes == nil {
currentTerm = 0
} else {
currentTerm, _ = strconv.Atoi(s.device.Attributes[mastershipTermKey])
}

if state.Master == s.mastershipStore.NodeID() && uint64(state.Term) >= uint64(currentTerm) {
log.Info("Master node", s.mastershipStore.NodeID(), ":", currentTerm, ":", state.Term)
err := s.connect()
if err != nil {
log.Error(err)
Expand All @@ -103,7 +112,8 @@ func (s *Session) open() error {
s.mu.Lock()
s.mastershipState = &state
s.mu.Unlock()
if state.Master == s.mastershipStore.NodeID() && !connected {
currentTerm, _ := strconv.Atoi(s.device.Attributes[mastershipTermKey])
if state.Master == s.mastershipStore.NodeID() && !connected && uint64(state.Term) >= uint64(currentTerm) {
log.Info("Election changed", s.mastershipStore.NodeID())
err := s.connect()
if err != nil {
Expand Down

0 comments on commit 1bf23e1

Please sign in to comment.