From a2711eefcfd1c1b610fbc8884e68503ce42894f0 Mon Sep 17 00:00:00 2001 From: adibrastegarnia Date: Wed, 2 Sep 2020 17:02:13 -0700 Subject: [PATCH] Use one single channel and single go routine for updating the device state --- pkg/southbound/synchronizer/deviceUpdate.go | 47 +++++++++++++++---- pkg/southbound/synchronizer/session.go | 7 --- pkg/southbound/synchronizer/sessionManager.go | 7 +++ test/gnmi/suite.go | 2 +- 4 files changed, 47 insertions(+), 16 deletions(-) diff --git a/pkg/southbound/synchronizer/deviceUpdate.go b/pkg/southbound/synchronizer/deviceUpdate.go index 0c5ee834f..d3b65869b 100644 --- a/pkg/southbound/synchronizer/deviceUpdate.go +++ b/pkg/southbound/synchronizer/deviceUpdate.go @@ -27,11 +27,11 @@ import ( topodevice "github.com/onosproject/onos-topo/api/device" ) -func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState, +func (sm *SessionManager) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState, service topodevice.ServiceState) error { log.Info("Update device state") - topoDevice, err := s.deviceStore.Get(id) + topoDevice, err := sm.deviceStore.Get(id) st, ok := status.FromError(err) // If the device doesn't exist then we should not update its state @@ -55,7 +55,7 @@ func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.Connect protocolState.ChannelState = channel protocolState.ServiceState = service topoDevice.Protocols = append(topoDevice.Protocols, protocolState) - mastershipState, err := s.mastershipStore.GetMastership(topoDevice.ID) + mastershipState, err := sm.mastershipStore.GetMastership(topoDevice.ID) if err != nil { return err } @@ -65,7 +65,7 @@ func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.Connect } topoDevice.Attributes[mastershipTermKey] = strconv.FormatUint(uint64(mastershipState.Term), 10) - _, err = s.deviceStore.Update(topoDevice) + _, err = sm.deviceStore.Update(topoDevice) if err != nil { log.Errorf("Device %s is not updated %s", id, err.Error()) return err @@ -88,8 +88,8 @@ func remove(s []*topodevice.ProtocolState, i int) []*topodevice.ProtocolState { return s[:len(s)-1] } -func (s *Session) updateDeviceState() error { - for event := range s.southboundErrorChan { +func (sm *SessionManager) updateDeviceState() error { + for event := range sm.southboundErrorChan { log.Info("update event received") switch event.EventType() { case events.EventTypeDeviceConnected: @@ -97,15 +97,46 @@ func (s *Session) updateDeviceState() error { id := topodevice.ID(event.Subject()) // TODO: Retry only on write conflicts return backoff.Retry(func() error { - return s.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED, + /* This entire read-modify-write sequence has to be retried until one of two conditions is met: + - The update is successful or + - Upon retry, the node encounters a mastership term greater than its own */ + + currentTerm, err := sm.sessions[id].getCurrentTerm() + if err != nil { + return nil + } + mastershipState, err := sm.mastershipStore.GetMastership(id) + if err != nil { + return nil + } + if uint64(mastershipState.Term) < uint64(currentTerm) { + return nil + } + err = sm.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED, topodevice.ServiceState_AVAILABLE) + return err + }, backoff.NewExponentialBackOff()) case events.EventTypeErrorDeviceConnect: id := topodevice.ID(event.Subject()) // TODO: Retry only on write conflicts return backoff.Retry(func() error { - return s.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED, + + currentTerm, err := sm.sessions[id].getCurrentTerm() + if err != nil { + return nil + } + mastershipState, err := sm.mastershipStore.GetMastership(id) + if err != nil { + return nil + } + if uint64(mastershipState.Term) < uint64(currentTerm) { + return nil + } + + err = sm.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED, topodevice.ServiceState_UNAVAILABLE) + return err }, backoff.NewExponentialBackOff()) default: diff --git a/pkg/southbound/synchronizer/session.go b/pkg/southbound/synchronizer/session.go index 34b2f4161..58f9398ae 100644 --- a/pkg/southbound/synchronizer/session.go +++ b/pkg/southbound/synchronizer/session.go @@ -86,13 +86,6 @@ func (s *Session) open() error { return err } - go func() { - err := s.updateDeviceState() - if err != nil { - return - } - }() - go func() { connected := false state, _ := s.mastershipStore.GetMastership(s.device.ID) diff --git a/pkg/southbound/synchronizer/sessionManager.go b/pkg/southbound/synchronizer/sessionManager.go index 6db03206c..f338d1e70 100644 --- a/pkg/southbound/synchronizer/sessionManager.go +++ b/pkg/southbound/synchronizer/sessionManager.go @@ -150,6 +150,13 @@ func (sm *SessionManager) Start() error { } go sm.processDeviceEvents(sm.topoChannel) + go func() { + err := sm.updateDeviceState() + if err != nil { + return + } + }() + return nil } diff --git a/test/gnmi/suite.go b/test/gnmi/suite.go index 7bbfc0ecc..32ee82d9c 100644 --- a/test/gnmi/suite.go +++ b/test/gnmi/suite.go @@ -66,7 +66,7 @@ func (s *TestSuite) SetupTestSuite() error { err = helm.Chart("onos-config", onostest.OnosChartRepo). Release("onos-config"). Set("image.tag", "latest"). - Set("replicaCount", 2). + Set("replicaCount", 1). Set("storage.controller", onostest.AtomixController(testName, onosComponentName)). Install(true) if err != nil {