Skip to content

Commit

Permalink
Merge pull request #27 from mavenugo/app
Browse files Browse the repository at this point in the history
Application async notification infra and test-cases.
  • Loading branch information
nerdalert committed Nov 11, 2014
2 parents cb2fd65 + 0ce5e7f commit ee06ce9
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 10 deletions.
87 changes: 77 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,90 @@ import (
type OvsdbClient struct {
rpcClient *rpc2.Client
Schema map[string]DatabaseSchema
handlers []NotificationHandler
}

func Connect(ipAddr string, port int) (OvsdbClient, error) {
func newOvsdbClient(c *rpc2.Client) *OvsdbClient {
ovs := &OvsdbClient{rpcClient: c, Schema: make(map[string]DatabaseSchema)}
if connections == nil {
connections = make(map[*rpc2.Client]*OvsdbClient)
}
connections[c] = ovs
return ovs
}

// Would rather replace this connection map with an OvsdbClient Receiver scoped method
// Unfortunately rpc2 package acts wierd with a receiver scoped method and needs some investigation.
var connections map[*rpc2.Client]*OvsdbClient

const DEFAULT_ADDR = "127.0.0.1"
const DEFAULT_PORT = 6640

func Connect(ipAddr string, port int) (*OvsdbClient, error) {
if ipAddr == "" {
ipAddr = DEFAULT_ADDR
}

if port <= 0 {
port = DEFAULT_PORT
}

target := fmt.Sprintf("%s:%d", ipAddr, port)
conn, err := net.Dial("tcp", target)

if err != nil {
panic(err)
return nil, err
}

c := rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn))
// Process Async Notifications
c.Handle("echo", echo)
c.Handle("update", update)

go c.Run()
ovs := OvsdbClient{c, make(map[string]DatabaseSchema)}
go handleDisconnectNotification(c)

ovs := newOvsdbClient(c)

// Process Async Notifications
dbs, err := ovs.ListDbs()
if err == nil {
for _, db := range dbs {
schema, err := ovs.GetSchema(db)
if err == nil {
ovs.Schema[db] = *schema
} else {
return nil, err
}
}
}
return ovs, err
return ovs, nil
}

func (ovs OvsdbClient) Disconnect() {
ovs.rpcClient.Close()
func (ovs *OvsdbClient) Register(handler NotificationHandler) {
ovs.handlers = append(ovs.handlers, handler)
}

type NotificationHandler interface {
// RFC 7047 section 4.1.6 Update Notification
Update(context interface{}, tableUpdates TableUpdates)

// RFC 7047 section 4.1.9 Locked Notification
Locked([]interface{})

// RFC 7047 section 4.1.10 Stolen Notification
Stolen([]interface{})

// RFC 7047 section 4.1.11 Echo Notification
Echo([]interface{})
}

// RFC 7047 : Section 4.1.6 : Echo
func echo(client *rpc2.Client, args string, reply *interface{}) error {
*reply = args
if _, ok := connections[client]; ok {
for _, handler := range connections[client].handlers {
handler.Echo(nil)
}
}
return nil
}

Expand Down Expand Up @@ -78,8 +126,10 @@ func update(client *rpc2.Client, params []interface{}, reply *interface{}) error

// Update the local DB cache with the tableUpdates
tableUpdates := getTableUpdatesFromRawUnmarshal(rowUpdates)
if len(tableUpdates.Updates) > 0 {
return nil
if _, ok := connections[client]; ok {
for _, handler := range connections[client].handlers {
handler.Update(params, tableUpdates)
}
}

return nil
Expand Down Expand Up @@ -179,3 +229,20 @@ func getTableUpdatesFromRawUnmarshal(raw map[string]map[string]RowUpdate) TableU
}
return tableUpdates
}

func clearConnection(c *rpc2.Client) {
connections[c] = nil
}

func handleDisconnectNotification(c *rpc2.Client) {
disconnected := c.DisconnectNotify()
select {
case <-disconnected:
clearConnection(c)
}
}

func (ovs OvsdbClient) Disconnect() {
ovs.rpcClient.Close()
clearConnection(ovs.rpcClient)
}
86 changes: 86 additions & 0 deletions ovs_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,49 @@ import (
"log"
"os"
"testing"
"time"
)

func TestConnect(t *testing.T) {
if testing.Short() {
t.Skip()
}

timeoutChan := make(chan bool)
connected := make(chan bool)
go func() {
time.Sleep(10 * time.Second)
timeoutChan <- true
}()

go func() {
// Use Convenience params. Ignore failure even if any
_, err := Connect("", 0)
if err != nil {
log.Println("Couldnt establish OVSDB connection with Defult params. No big deal")
}
}()

go func() {
ovs, err := Connect(os.Getenv("DOCKER_IP"), int(6640))
if err != nil {
connected <- false
} else {
connected <- true
ovs.Disconnect()
}
}()

select {
case <-timeoutChan:
t.Error("Connection Timed Out")
case b := <-connected:
if !b {
t.Error("Couldnt connect to OVSDB Server")
}
}
}

func TestListDbs(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -209,6 +250,51 @@ func TestMonitor(t *testing.T) {
ovs.Disconnect()
}

func TestNotify(t *testing.T) {
if testing.Short() {
t.Skip()
}

ovs, err := Connect(os.Getenv("DOCKER_IP"), int(6640))
if err != nil {
log.Fatal("Failed to Connect. error:", err)
panic(err)
}

notifyEchoChan := make(chan bool)

notifier := Notifier{notifyEchoChan}
ovs.Register(notifier)

timeoutChan := make(chan bool)
go func() {
time.Sleep(10 * time.Second)
timeoutChan <- true
}()

select {
case <-timeoutChan:
t.Error("No Echo message notify in 10 seconds")
case <-notifyEchoChan:
break
}
ovs.Disconnect()
}

type Notifier struct {
echoChan chan bool
}

func (n Notifier) Update(context interface{}, tableUpdates TableUpdates) {
}
func (n Notifier) Locked([]interface{}) {
}
func (n Notifier) Stolen([]interface{}) {
}
func (n Notifier) Echo([]interface{}) {
n.echoChan <- true
}

func TestDBSchemaValidation(t *testing.T) {

if testing.Short() {
Expand Down

0 comments on commit ee06ce9

Please sign in to comment.