Skip to content

Commit

Permalink
For debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
adibrastegarnia committed Sep 3, 2020
1 parent 7317aa9 commit 67ac16d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 32 deletions.
2 changes: 1 addition & 1 deletion pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewManager(leadershipStore leadership.Store, mastershipStore mastership.Sto
TopoChannel: make(chan *topodevice.ListResponse, 10),
ModelRegistry: modelReg,
OperationalStateChannel: make(chan events.OperationalStateEvent),
SouthboundErrorChan: make(chan events.DeviceResponse, 10),
SouthboundErrorChan: make(chan events.DeviceResponse),
Dispatcher: dispatcher.NewDispatcher(),
OperationalStateCache: make(map[topodevice.ID]devicechange.TypedValueMap),
OperationalStateCacheLock: &sync.RWMutex{},
Expand Down
52 changes: 31 additions & 21 deletions pkg/southbound/synchronizer/deviceUpdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
topodevice "github.com/onosproject/onos-topo/api/device"
)

func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState,
func (sm *SessionManager) updateDevice(id topodevice.ID, connectivity topodevice.ConnectivityState, channel topodevice.ChannelState,
service topodevice.ServiceState) error {
log.Info("Update device state")

topoDevice, err := s.deviceStore.Get(id)
topoDevice, err := sm.deviceStore.Get(id)
st, ok := status.FromError(err)

// If the device doesn't exist then we should not update its state
Expand All @@ -55,7 +55,7 @@ func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.Connect
protocolState.ChannelState = channel
protocolState.ServiceState = service
topoDevice.Protocols = append(topoDevice.Protocols, protocolState)
mastershipState, err := s.mastershipStore.GetMastership(topoDevice.ID)
mastershipState, err := sm.mastershipStore.GetMastership(topoDevice.ID)
if err != nil {
return err
}
Expand All @@ -65,7 +65,7 @@ func (s *Session) updateDevice(id topodevice.ID, connectivity topodevice.Connect
}

topoDevice.Attributes[mastershipTermKey] = strconv.FormatUint(uint64(mastershipState.Term), 10)
_, err = s.deviceStore.Update(topoDevice)
_, err = sm.deviceStore.Update(topoDevice)
if err != nil {
log.Errorf("Device %s is not updated %s", id, err.Error())
return err
Expand All @@ -88,32 +88,40 @@ func remove(s []*topodevice.ProtocolState, i int) []*topodevice.ProtocolState {
return s[:len(s)-1]
}

func (s *Session) updateDeviceState() error {
log.Info("update device state started")
for event := range s.southboundErrorChan {
func (sm *SessionManager) updateDeviceState() error {

for event := range sm.southboundErrorChan {
log.Info("update event received")
switch event.EventType() {
case events.EventTypeDeviceConnected:
log.Info("Device connected")
id := topodevice.ID(event.Subject())
// TODO: Retry only on write conflicts
return backoff.Retry(func() error {

/* This entire read-modify-write sequence has to be retried until one of two conditions is met:
- The update is successful or
- Upon retry, the node encounters a mastership term greater than its own */

currentTerm, err := s.getCurrentTerm()
if err != nil {
return nil
}
mastershipState, err := s.mastershipStore.GetMastership(id)
if err != nil {
session := sm.sessions[id]
if session != nil {
currentTerm, err := session.getCurrentTerm()
if err != nil {
return nil
}
mastershipState, err := sm.mastershipStore.GetMastership(id)
if err != nil {
return nil
}
if uint64(mastershipState.Term) < uint64(currentTerm) {
return nil
}

} else {
return nil
}
if uint64(mastershipState.Term) < uint64(currentTerm) {
return nil
}
err = s.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED,

err := sm.updateDevice(id, topodevice.ConnectivityState_REACHABLE, topodevice.ChannelState_CONNECTED,
topodevice.ServiceState_AVAILABLE)
return err

Expand All @@ -123,26 +131,28 @@ func (s *Session) updateDeviceState() error {
// TODO: Retry only on write conflicts
return backoff.Retry(func() error {

currentTerm, err := s.getCurrentTerm()
/*currentTerm, err := sm.sessions[id].getCurrentTerm()
if err != nil {
return nil
}
mastershipState, err := s.mastershipStore.GetMastership(id)
mastershipState, err := sm.mastershipStore.GetMastership(id)
if err != nil {
return nil
}
if uint64(mastershipState.Term) < uint64(currentTerm) {
return nil
}
}*/

err = s.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED,
err := sm.updateDevice(id, topodevice.ConnectivityState_UNREACHABLE, topodevice.ChannelState_DISCONNECTED,
topodevice.ServiceState_UNAVAILABLE)
return err
}, backoff.NewExponentialBackOff())

default:
log.Info("Here 0")
}
}

log.Info("Here 1")
return nil
}
7 changes: 1 addition & 6 deletions pkg/southbound/synchronizer/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ func (s *Session) open() error {
if err != nil {
return err
}
go func() {
err := s.updateDeviceState()
if err != nil {
return
}
}()

go func() {
connected := false
Expand Down Expand Up @@ -223,6 +217,7 @@ func (s *Session) synchronize() error {
//spawning two go routines to propagate changes and to get operational state
//go sync.syncConfigEventsToDevice(target, respChan)
s.southboundErrorChan <- events.NewDeviceConnectedEvent(events.EventTypeDeviceConnected, string(s.device.ID))
log.Info("len southbound err chan:", len(s.southboundErrorChan))
if sync.getStateMode == modelregistry.GetStateOpState {
go sync.syncOperationalStateByPartition(ctx, s.target, s.southboundErrorChan)
} else if sync.getStateMode == modelregistry.GetStateExplicitRoPaths ||
Expand Down
17 changes: 13 additions & 4 deletions pkg/southbound/synchronizer/sessionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,14 @@ func (sm *SessionManager) Start() error {
if err != nil {
return err
}
//errChan := make(chan events.DeviceResponse)
//sm.southboundErrorChan = errChan

go sm.processDeviceEvents(sm.topoChannel)
go func() {
err := sm.updateDeviceState()
if err != nil {
return
}
}()

return nil
}
Expand Down Expand Up @@ -179,11 +183,14 @@ func (sm *SessionManager) processDeviceEvent(event *topodevice.ListResponse) err
return err
}

log.Info("len southbound error channel:", len(sm.southboundErrorChan))

case topodevice.ListResponse_NONE:
err := sm.createSession(event.Device)
if err != nil {
return err
}

case topodevice.ListResponse_UPDATED:
session, ok := sm.sessions[event.Device.ID]
if !ok {
Expand Down Expand Up @@ -221,11 +228,11 @@ func (sm *SessionManager) createSession(device *topodevice.Device) error {
sm.mu.Lock()
defer sm.mu.Unlock()

errChan := make(chan events.DeviceResponse)
//errChan := make(chan events.DeviceResponse)

session := &Session{
opStateChan: sm.opStateChan,
southboundErrorChan: errChan,
southboundErrorChan: sm.southboundErrorChan,
dispatcher: sm.dispatcher,
modelRegistry: sm.modelRegistry,
operationalStateCache: sm.operationalStateCache,
Expand All @@ -245,6 +252,8 @@ func (sm *SessionManager) createSession(device *topodevice.Device) error {
return err
}

log.Info("Here after open session", len(sm.southboundErrorChan))

// Close the old session and adds the new session to the list of sessions
oldSession, ok := sm.sessions[device.ID]
if ok {
Expand Down

0 comments on commit 67ac16d

Please sign in to comment.