diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 0cc4c9cda..60df2b135 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -105,7 +105,7 @@ func NewManager(leadershipStore leadership.Store, mastershipStore mastership.Sto TopoChannel: make(chan *topodevice.ListResponse, 10), ModelRegistry: modelReg, OperationalStateChannel: make(chan events.OperationalStateEvent), - SouthboundErrorChan: make(chan events.DeviceResponse, 10), + SouthboundErrorChan: make(chan events.DeviceResponse), Dispatcher: dispatcher.NewDispatcher(), OperationalStateCache: make(map[topodevice.ID]devicechange.TypedValueMap), OperationalStateCacheLock: &sync.RWMutex{}, diff --git a/pkg/southbound/synchronizer/deviceUpdate.go b/pkg/southbound/synchronizer/deviceUpdate.go index 52bbe2e81..43b2d1f9c 100644 --- a/pkg/southbound/synchronizer/deviceUpdate.go +++ b/pkg/southbound/synchronizer/deviceUpdate.go @@ -27,11 +27,11 @@ import ( topodevice "github.com/onosproject/onos-topo/api/device" ) -func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState, +func (sm *SessionManager) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState, service topodevice.ServiceState) error { log.Info("Update device state") - topoDevice, err := s.deviceStore.Get(id) + topoDevice, err := sm.deviceStore.Get(id) st, ok := status.FromError(err) // If the device doesn't exist then we should not update its state @@ -55,7 +55,7 @@ func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.Connect protocolState.ChannelState = channel protocolState.ServiceState = service topoDevice.Protocols = append(topoDevice.Protocols, protocolState) - mastershipState, err := s.mastershipStore.GetMastership(topoDevice.ID) + mastershipState, err := sm.mastershipStore.GetMastership(topoDevice.ID) if err != nil { return err } @@ -65,7 +65,7 @@ func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.Connect } topoDevice.Attributes[mastershipTermKey] = strconv.FormatUint(uint64(mastershipState.Term), 10) - _, err = s.deviceStore.Update(topoDevice) + _, err = sm.deviceStore.Update(topoDevice) if err != nil { log.Errorf("Device %s is not updated %s", id, err.Error()) return err @@ -88,9 +88,9 @@ func remove(s []*topodevice.ProtocolState, i int) []*topodevice.ProtocolState { return s[:len(s)-1] } -func (s *Session) updateDeviceState() error { - log.Info("update device state started") - for event := range s.southboundErrorChan { +func (sm *SessionManager) updateDeviceState() error { + + for event := range sm.southboundErrorChan { log.Info("update event received") switch event.EventType() { case events.EventTypeDeviceConnected: @@ -98,22 +98,30 @@ func (s *Session) updateDeviceState() error { id := topodevice.ID(event.Subject()) // TODO: Retry only on write conflicts return backoff.Retry(func() error { + /* This entire read-modify-write sequence has to be retried until one of two conditions is met: - The update is successful or - Upon retry, the node encounters a mastership term greater than its own */ - currentTerm, err := s.getCurrentTerm() - if err != nil { - return nil - } - mastershipState, err := s.mastershipStore.GetMastership(id) - if err != nil { + session := sm.sessions[id] + if session != nil { + currentTerm, err := session.getCurrentTerm() + if err != nil { + return nil + } + mastershipState, err := sm.mastershipStore.GetMastership(id) + if err != nil { + return nil + } + if uint64(mastershipState.Term) < uint64(currentTerm) { + return nil + } + + } else { return nil } - if uint64(mastershipState.Term) < uint64(currentTerm) { - return nil - } - err = s.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED, + + err := sm.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED, topodevice.ServiceState_AVAILABLE) return err @@ -123,26 +131,28 @@ func (s *Session) updateDeviceState() error { // TODO: Retry only on write conflicts return backoff.Retry(func() error { - currentTerm, err := s.getCurrentTerm() + /*currentTerm, err := sm.sessions[id].getCurrentTerm() if err != nil { return nil } - mastershipState, err := s.mastershipStore.GetMastership(id) + mastershipState, err := sm.mastershipStore.GetMastership(id) if err != nil { return nil } if uint64(mastershipState.Term) < uint64(currentTerm) { return nil - } + }*/ - err = s.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED, + err := sm.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED, topodevice.ServiceState_UNAVAILABLE) return err }, backoff.NewExponentialBackOff()) default: + log.Info("Here 0") } } + log.Info("Here 1") return nil } diff --git a/pkg/southbound/synchronizer/session.go b/pkg/southbound/synchronizer/session.go index 066b209ad..5632d01a6 100644 --- a/pkg/southbound/synchronizer/session.go +++ b/pkg/southbound/synchronizer/session.go @@ -85,12 +85,6 @@ func (s *Session) open() error { if err != nil { return err } - go func() { - err := s.updateDeviceState() - if err != nil { - return - } - }() go func() { connected := false @@ -223,6 +217,7 @@ func (s *Session) synchronize() error { //spawning two go routines to propagate changes and to get operational state //go sync.syncConfigEventsToDevice(target, respChan) s.southboundErrorChan <- events.NewDeviceConnectedEvent(events.EventTypeDeviceConnected, string(s.device.ID)) + log.Info("len southbound err chan:", len(s.southboundErrorChan)) if sync.getStateMode == modelregistry.GetStateOpState { go sync.syncOperationalStateByPartition(ctx, s.target, s.southboundErrorChan) } else if sync.getStateMode == modelregistry.GetStateExplicitRoPaths || diff --git a/pkg/southbound/synchronizer/sessionManager.go b/pkg/southbound/synchronizer/sessionManager.go index 31c01c10b..47dbce313 100644 --- a/pkg/southbound/synchronizer/sessionManager.go +++ b/pkg/southbound/synchronizer/sessionManager.go @@ -148,10 +148,14 @@ func (sm *SessionManager) Start() error { if err != nil { return err } - //errChan := make(chan events.DeviceResponse) - //sm.southboundErrorChan = errChan go sm.processDeviceEvents(sm.topoChannel) + go func() { + err := sm.updateDeviceState() + if err != nil { + return + } + }() return nil } @@ -179,11 +183,14 @@ func (sm *SessionManager) processDeviceEvent(event *topodevice.ListResponse) err return err } + log.Info("len southbound error channel:", len(sm.southboundErrorChan)) + case topodevice.ListResponse_NONE: err := sm.createSession(event.Device) if err != nil { return err } + case topodevice.ListResponse_UPDATED: session, ok := sm.sessions[event.Device.ID] if !ok { @@ -221,11 +228,11 @@ func (sm *SessionManager) createSession(device *topodevice.Device) error { sm.mu.Lock() defer sm.mu.Unlock() - errChan := make(chan events.DeviceResponse) + //errChan := make(chan events.DeviceResponse) session := &Session{ opStateChan: sm.opStateChan, - southboundErrorChan: errChan, + southboundErrorChan: sm.southboundErrorChan, dispatcher: sm.dispatcher, modelRegistry: sm.modelRegistry, operationalStateCache: sm.operationalStateCache, @@ -245,6 +252,8 @@ func (sm *SessionManager) createSession(device *topodevice.Device) error { return err } + log.Info("Here after open session", len(sm.southboundErrorChan)) + // Close the old session and adds the new session to the list of sessions oldSession, ok := sm.sessions[device.ID] if ok {