Skip to content

Commit

Permalink
Merge 82f8e46 into e60257f
Browse files Browse the repository at this point in the history
  • Loading branch information
Adib Rastegarnia committed Sep 11, 2020
2 parents e60257f + 82f8e46 commit ab3b062
Show file tree
Hide file tree
Showing 13 changed files with 1,355 additions and 621 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.14

require (
github.com/atomix/go-client v0.2.3
cloud.google.com/go v0.43.0 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/docker/docker v1.13.1
github.com/gogo/protobuf v1.3.1
Expand Down Expand Up @@ -31,6 +32,8 @@ require (
github.com/spf13/viper v1.6.2
github.com/stretchr/testify v1.5.1
go.uber.org/multierr v1.4.0 // indirect
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 // indirect
golang.org/x/tools v0.0.0-20200113040837-eac381796e91 // indirect
google.golang.org/genproto v0.0.0-20200212174721-66ed5ce911ce // indirect
google.golang.org/grpc v1.31.1
gopkg.in/yaml.v2 v2.2.8
Expand Down
95 changes: 0 additions & 95 deletions pkg/manager/devicehandler.go

This file was deleted.

48 changes: 10 additions & 38 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package manager

import (
"fmt"
"strings"
"sync"

devicechange "github.com/onosproject/onos-config/api/types/change/device"
Expand Down Expand Up @@ -95,6 +94,7 @@ func NewManager(leadershipStore leadership.Store, mastershipStore mastership.Sto
DeviceStateStore: deviceStateStore,
DeviceStore: deviceStore,
DeviceCache: deviceCache,
MastershipStore: mastershipStore,
NetworkChangesStore: networkChangesStore,
NetworkSnapshotStore: networkSnapshotStore,
DeviceSnapshotStore: deviceSnapshotStore,
Expand All @@ -105,7 +105,7 @@ func NewManager(leadershipStore leadership.Store, mastershipStore mastership.Sto
TopoChannel: make(chan *topodevice.ListResponse, 10),
ModelRegistry: modelReg,
OperationalStateChannel: make(chan events.OperationalStateEvent),
SouthboundErrorChan: make(chan events.DeviceResponse, 10),
SouthboundErrorChan: make(chan events.DeviceResponse),
Dispatcher: dispatcher.NewDispatcher(),
OperationalStateCache: make(map[topodevice.ID]devicechange.TypedValueMap),
OperationalStateCacheLock: &sync.RWMutex{},
Expand Down Expand Up @@ -146,32 +146,30 @@ func (m *Manager) Run() {

// Start the main dispatcher system
go m.Dispatcher.ListenOperationalState(m.OperationalStateChannel)
// Listening for errors in the Southbound
go listenOnResponseChannel(m.SouthboundErrorChan, m)

factory, err := synchronizer.NewFactory(
sessionManager, err := synchronizer.NewSessionManager(
synchronizer.WithTopoChannel(m.TopoChannel),
synchronizer.WithOpStateChannel(m.OperationalStateChannel),
synchronizer.WithSouthboundErrChan(m.SouthboundErrorChan),
synchronizer.WithDispatcher(m.Dispatcher),
synchronizer.WithModelRegistry(m.ModelRegistry),
synchronizer.WithOperationalStateCache(m.OperationalStateCache),
synchronizer.WithNewTargetFn(southbound.TargetGenerator),
synchronizer.WithOperationalStateCacheLock(m.OperationalStateCacheLock),
synchronizer.WithDeviceChangeStore(m.DeviceChangesStore),
synchronizer.WithMastershipStore(m.MastershipStore),
synchronizer.WithDeviceStore(m.DeviceStore),
synchronizer.WithSessions(make(map[topodevice.ID]*synchronizer.Session)),
)

if err != nil {
log.Error("Error in creating device factory", err)
log.Error("Error in creating session manager", err)
}

go factory.TopoEventHandler()
log.Debug("Device Event Handler started")

err = m.DeviceStore.Watch(m.TopoChannel)
err = sessionManager.Start()
if err != nil {
log.Error("Error Watching devices", err)
log.Errorf("Error in starting session manager", err)
}

log.Info("Manager Started")
}

Expand All @@ -188,32 +186,6 @@ func GetManager() *Manager {
return &mgr
}

func listenOnResponseChannel(respChan chan events.DeviceResponse, m *Manager) {
log.Info("Listening for Errors in Manager")
for event := range respChan {
subject := topodevice.ID(event.Subject())
switch event.EventType() {
case events.EventTypeDeviceConnected:
err := m.DeviceConnected(subject)
if err != nil {
log.Error("Can't notify connection", err)
}
case events.EventTypeErrorDeviceConnect:
err := m.DeviceDisconnected(subject, event.Error())
if err != nil {
log.Error("Can't notify disconnection", err)
}
default:
if strings.Contains(event.Error().Error(), "desc =") {
log.Errorf("Error reported to channel %s",
strings.Split(event.Error().Error(), " desc = ")[1])
} else {
log.Error("Response reported to channel ", event.Error().Error())
}
}
}
}

// ComputeDeviceChange computes a given device change the given updates and deletes, according to the path
// on the configuration for the specified target
func (m *Manager) ComputeDeviceChange(deviceName devicetype.ID, version devicetype.Version,
Expand Down
81 changes: 2 additions & 79 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package manager

import (
"errors"

"github.com/golang/mock/gomock"
changetypes "github.com/onosproject/onos-config/api/types/change"
devicechange "github.com/onosproject/onos-config/api/types/change/device"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/openconfig/goyang/pkg/yang"
"github.com/openconfig/ygot/ygot"
"gotest.tools/assert"

"strings"
"testing"
"time"
Expand Down Expand Up @@ -626,85 +628,6 @@ func TestManager_GetTargetState(t *testing.T) {
assert.Assert(t, len(stateBad) == 0, "Bad path entry has incorrect length %d", len(stateBad))
}

func TestManager_DeviceConnected(t *testing.T) {
mgrTest, mocks := setUp(t)
const (
device1 = "device1"
)

deviceDisconnected := &topodevice.Device{
ID: "device1",
Revision: 1,
Address: "device1:1234",
Version: deviceVersion1,
}

device1Connected := &topodevice.Device{
ID: "device1",
Revision: 1,
Address: "device1:1234",
Version: deviceVersion1,
}

mocks.MockStores.DeviceStore.EXPECT().Get("device1")

protocolState := new(topodevice.ProtocolState)
protocolState.Protocol = topodevice.Protocol_GNMI
protocolState.ConnectivityState = topodevice.ConnectivityState_REACHABLE
protocolState.ChannelState = topodevice.ChannelState_CONNECTED
protocolState.ServiceState = topodevice.ServiceState_AVAILABLE
device1Connected.Protocols = append(device1Connected.Protocols, protocolState)

mocks.MockStores.DeviceStore.EXPECT().Get(gomock.Any()).Return(deviceDisconnected, nil)
mocks.MockStores.DeviceStore.EXPECT().Update(gomock.Any()).Return(device1Connected, nil)

err := mgrTest.DeviceConnected(device1)
assert.NilError(t, err)
}

func TestManager_DeviceDisconnected(t *testing.T) {
mgrTest, mocks := setUp(t)
const (
device1 = "device1"
)

deviceDisconnected := &topodevice.Device{
ID: "device1",
Revision: 1,
Address: "device1:1234",
Version: deviceVersion1,
}

device1Connected := &topodevice.Device{
ID: "device1",
Revision: 1,
Address: "device1:1234",
Version: deviceVersion1,
}

mocks.MockStores.DeviceStore.EXPECT().Get("device1")

protocolState := new(topodevice.ProtocolState)
protocolState.Protocol = topodevice.Protocol_GNMI
protocolState.ConnectivityState = topodevice.ConnectivityState_REACHABLE
protocolState.ChannelState = topodevice.ChannelState_CONNECTED
protocolState.ServiceState = topodevice.ServiceState_AVAILABLE
device1Connected.Protocols = append(device1Connected.Protocols, protocolState)

protocolStateDisconnected := new(topodevice.ProtocolState)
protocolStateDisconnected.Protocol = topodevice.Protocol_GNMI
protocolStateDisconnected.ConnectivityState = topodevice.ConnectivityState_UNREACHABLE
protocolStateDisconnected.ChannelState = topodevice.ChannelState_DISCONNECTED
protocolStateDisconnected.ServiceState = topodevice.ServiceState_UNAVAILABLE
deviceDisconnected.Protocols = append(device1Connected.Protocols, protocolState)

mocks.MockStores.DeviceStore.EXPECT().Get(gomock.Any()).Return(device1Connected, nil)
mocks.MockStores.DeviceStore.EXPECT().Update(gomock.Any()).Return(deviceDisconnected, nil)

err := mgrTest.DeviceDisconnected(device1, errors.New("device reported disconnection"))
assert.NilError(t, err)
}

type MockModelPlugin struct{}

func (m MockModelPlugin) ModelData() (string, string, []*gnmi.ModelData, string) {
Expand Down
Loading

0 comments on commit ab3b062

Please sign in to comment.