Skip to content

Commit

Permalink
Use one single channel and single go routine for updating the device …
Browse files Browse the repository at this point in the history
…state
  • Loading branch information
adibrastegarnia committed Sep 3, 2020
1 parent 4ff9cbf commit a2711ee
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 16 deletions.
47 changes: 39 additions & 8 deletions pkg/southbound/synchronizer/deviceUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
topodevice "github.com/onosproject/onos-topo/api/device"
)

func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState,
func (sm *SessionManager) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState,
service topodevice.ServiceState) error {
log.Info("Update device state")

topoDevice, err := s.deviceStore.Get(id)
topoDevice, err := sm.deviceStore.Get(id)
st, ok := status.FromError(err)

// If the device doesn't exist then we should not update its state
Expand All @@ -55,7 +55,7 @@ func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.Connect
protocolState.ChannelState = channel
protocolState.ServiceState = service
topoDevice.Protocols = append(topoDevice.Protocols, protocolState)
mastershipState, err := s.mastershipStore.GetMastership(topoDevice.ID)
mastershipState, err := sm.mastershipStore.GetMastership(topoDevice.ID)
if err != nil {
return err
}
Expand All @@ -65,7 +65,7 @@ func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.Connect
}

topoDevice.Attributes[mastershipTermKey] = strconv.FormatUint(uint64(mastershipState.Term), 10)
_, err = s.deviceStore.Update(topoDevice)
_, err = sm.deviceStore.Update(topoDevice)
if err != nil {
log.Errorf("Device %s is not updated %s", id, err.Error())
return err
Expand All @@ -88,24 +88,55 @@ func remove(s []*topodevice.ProtocolState, i int) []*topodevice.ProtocolState {
return s[:len(s)-1]
}

func (s *Session) updateDeviceState() error {
for event := range s.southboundErrorChan {
func (sm *SessionManager) updateDeviceState() error {
for event := range sm.southboundErrorChan {
log.Info("update event received")
switch event.EventType() {
case events.EventTypeDeviceConnected:
log.Info("Device connected")
id := topodevice.ID(event.Subject())
// TODO: Retry only on write conflicts
return backoff.Retry(func() error {
return s.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED,
/* This entire read-modify-write sequence has to be retried until one of two conditions is met:
- The update is successful or
- Upon retry, the node encounters a mastership term greater than its own */

currentTerm, err := sm.sessions[id].getCurrentTerm()
if err != nil {
return nil
}
mastershipState, err := sm.mastershipStore.GetMastership(id)
if err != nil {
return nil
}
if uint64(mastershipState.Term) < uint64(currentTerm) {
return nil
}
err = sm.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED,
topodevice.ServiceState_AVAILABLE)
return err

}, backoff.NewExponentialBackOff())
case events.EventTypeErrorDeviceConnect:
id := topodevice.ID(event.Subject())
// TODO: Retry only on write conflicts
return backoff.Retry(func() error {
return s.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED,

currentTerm, err := sm.sessions[id].getCurrentTerm()
if err != nil {
return nil
}
mastershipState, err := sm.mastershipStore.GetMastership(id)
if err != nil {
return nil
}
if uint64(mastershipState.Term) < uint64(currentTerm) {
return nil
}

err = sm.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED,
topodevice.ServiceState_UNAVAILABLE)
return err
}, backoff.NewExponentialBackOff())

default:
Expand Down
7 changes: 0 additions & 7 deletions pkg/southbound/synchronizer/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ func (s *Session) open() error {
return err
}

go func() {
err := s.updateDeviceState()
if err != nil {
return
}
}()

go func() {
connected := false
state, _ := s.mastershipStore.GetMastership(s.device.ID)
Expand Down
7 changes: 7 additions & 0 deletions pkg/southbound/synchronizer/sessionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ func (sm *SessionManager) Start() error {
}

go sm.processDeviceEvents(sm.topoChannel)
go func() {
err := sm.updateDeviceState()
if err != nil {
return
}
}()

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion test/gnmi/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *TestSuite) SetupTestSuite() error {
err = helm.Chart("onos-config", onostest.OnosChartRepo).
Release("onos-config").
Set("image.tag", "latest").
Set("replicaCount", 2).
Set("replicaCount", 1).
Set("storage.controller", onostest.AtomixController(testName, onosComponentName)).
Install(true)
if err != nil {
Expand Down

0 comments on commit a2711ee

Please sign in to comment.