Skip to content

Commit

Permalink
Fix subscribe state gnmi test
Browse files Browse the repository at this point in the history
  • Loading branch information
adibrastegarnia committed Aug 11, 2020
1 parent a6056d8 commit 07511f4
Showing 1 changed file with 50 additions and 23 deletions.
73 changes: 50 additions & 23 deletions test/gnmi/subscribestategnmitest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ import (
"testing"
"time"

"fmt"

protobuf "github.com/golang/protobuf/proto"
"github.com/onosproject/onos-config/pkg/utils"
"github.com/openconfig/gnmi/client"
gpb "github.com/openconfig/gnmi/proto/gnmi"
ocgnmi "github.com/openconfig/gnmi/proto/gnmi"

"github.com/onosproject/onos-config/test/utils/gnmi"
"github.com/onosproject/onos-topo/api/device"
"github.com/stretchr/testify/assert"
)

// TestSubscribeStateGnmi tests a stream subscription to updates to a device using the diags API
// TestSubscribeStateGnmi tests a stream subscription to updates to a device
func (s *TestSuite) TestSubscribeStateGnmi(t *testing.T) {
t.Skip()
const dateTimePath = "/system/state/current-datetime"

previousTime = time.Now().Add(-5 * time.Second)
Expand All @@ -42,7 +44,6 @@ func (s *TestSuite) TestSubscribeStateGnmi(t *testing.T) {

// Wait for config to connect to the device
gnmi.WaitForDeviceAvailable(t, deviceID, 10*time.Second)
time.Sleep(250 * time.Millisecond)

// Make a GNMI client to use for subscribe
subC := client.BaseClient{}
Expand All @@ -56,36 +57,62 @@ func (s *TestSuite) TestSubscribeStateGnmi(t *testing.T) {
subReq := subscribeRequest{
path: path,
subListMode: gpb.SubscriptionList_STREAM,
subStreamMode: gpb.SubscriptionMode_TARGET_DEFINED,
subStreamMode: gpb.SubscriptionMode_ON_CHANGE,
}

q, errQuery := buildQueryRequest(subReq)
assert.NoError(t, errQuery, "Can't build Query")
updateCount := 0
syncCount := 0
done := make(chan bool, 1)

q.ProtoHandler = func(msg protobuf.Message) error {
// TODO subscription handler

resp, ok := msg.(*ocgnmi.SubscribeResponse)
if !ok {
return fmt.Errorf("failed to type assert message %#v", msg)
}

switch v := resp.Response.(type) {
case *gpb.SubscribeResponse_Update:
validateGnmiStateResponse(t, resp, simulator.Name())
s.mux.Lock()
updateCount++
s.mux.Unlock()

case *gpb.SubscribeResponse_Error:
return fmt.Errorf("error in response: %s", v)
case *gpb.SubscribeResponse_SyncResponse:
s.mux.Lock()
syncCount++
s.mux.Unlock()
validateGnmiStateResponse(t, resp, simulator.Name())

default:
return fmt.Errorf("unknown response %T: %s", v, v)
}

if updateCount == expectedNumUpdates && syncCount == expectedNumSyncs {
done <- true
}
return nil
}

// Subscription has to be spawned into a separate thread as it is blocking.
//go func() {
errSubscribe := subC.Subscribe(gnmi.MakeContext(), *q, "gnmi")
assert.NoError(t, errSubscribe, "Subscription Error %v", errSubscribe)
//}()

// Sleeping in order to make sure the subscribe request is properly stored and processed.
/*time.Sleep(100000)
i := 1
for i <= 10 {
select {
case response = <-respChan:
validateGnmiStateResponse(t, response, simulator.Name())
i++
case <-time.After(10 * time.Second):
assert.FailNow(t, "Expected Update Response")
}
}*/

go func() {
_ = subC.Subscribe(gnmi.MakeContext(), *q, "gnmi")
defer subC.Close()

}()

select {
case <-done:
gnmi.DeleteSimulator(t, simulator)
case <-time.After(timeOut * time.Second):
assert.FailNow(t, "timed out; the request should be processed under %d:", timeOut)
}

}

func validateGnmiStateResponse(t *testing.T, resp *gpb.SubscribeResponse, device string) {
Expand Down

0 comments on commit 07511f4

Please sign in to comment.