From eefd27999cefc673dc17e7446a229ce009e31a37 Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Mon, 10 Nov 2014 22:01:47 -0800 Subject: [PATCH 1/2] Added application notification infra & fixed a few other minor issues Signed-off-by: Madhu Venugopal --- client.go | 87 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 77 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index 4dc71aab..1ef2e65f 100644 --- a/client.go +++ b/client.go @@ -14,42 +14,90 @@ import ( type OvsdbClient struct { rpcClient *rpc2.Client Schema map[string]DatabaseSchema + handlers []NotificationHandler } -func Connect(ipAddr string, port int) (OvsdbClient, error) { +func newOvsdbClient(c *rpc2.Client) *OvsdbClient { + ovs := &OvsdbClient{rpcClient: c, Schema: make(map[string]DatabaseSchema)} + if connections == nil { + connections = make(map[*rpc2.Client]*OvsdbClient) + } + connections[c] = ovs + return ovs +} + +// Would rather replace this connection map with an OvsdbClient Receiver scoped method +// Unfortunately rpc2 package acts wierd with a receiver scoped method and needs some investigation. +var connections map[*rpc2.Client]*OvsdbClient + +const DEFAULT_ADDR = "127.0.0.1" +const DEFAULT_PORT = 6640 + +func Connect(ipAddr string, port int) (*OvsdbClient, error) { + if ipAddr == "" { + ipAddr = DEFAULT_ADDR + } + + if port <= 0 { + port = DEFAULT_PORT + } + target := fmt.Sprintf("%s:%d", ipAddr, port) conn, err := net.Dial("tcp", target) if err != nil { - panic(err) + return nil, err } c := rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn)) - // Process Async Notifications c.Handle("echo", echo) c.Handle("update", update) - go c.Run() - ovs := OvsdbClient{c, make(map[string]DatabaseSchema)} + go handleDisconnectNotification(c) + + ovs := newOvsdbClient(c) + + // Process Async Notifications dbs, err := ovs.ListDbs() if err == nil { for _, db := range dbs { schema, err := ovs.GetSchema(db) if err == nil { ovs.Schema[db] = *schema + } else { + return nil, err } } } - return ovs, err + return ovs, nil } -func (ovs OvsdbClient) Disconnect() { - ovs.rpcClient.Close() +func (ovs *OvsdbClient) Register(handler NotificationHandler) { + ovs.handlers = append(ovs.handlers, handler) +} + +type NotificationHandler interface { + // RFC 7047 section 4.1.6 Update Notification + Update(context interface{}, tableUpdates TableUpdates) + + // RFC 7047 section 4.1.9 Locked Notification + Locked([]interface{}) + + // RFC 7047 section 4.1.10 Stolen Notification + Stolen([]interface{}) + + // RFC 7047 section 4.1.11 Echo Notification + Echo([]interface{}) } // RFC 7047 : Section 4.1.6 : Echo func echo(client *rpc2.Client, args string, reply *interface{}) error { *reply = args + if _, ok := connections[client]; ok { + for _, handler := range connections[client].handlers { + handler.Echo(nil) + } + } return nil } @@ -78,8 +126,10 @@ func update(client *rpc2.Client, params []interface{}, reply *interface{}) error // Update the local DB cache with the tableUpdates tableUpdates := getTableUpdatesFromRawUnmarshal(rowUpdates) - if len(tableUpdates.Updates) > 0 { - return nil + if _, ok := connections[client]; ok { + for _, handler := range connections[client].handlers { + handler.Update(params, tableUpdates) + } } return nil @@ -179,3 +229,20 @@ func getTableUpdatesFromRawUnmarshal(raw map[string]map[string]RowUpdate) TableU } return tableUpdates } + +func clearConnection(c *rpc2.Client) { + connections[c] = nil +} + +func handleDisconnectNotification(c *rpc2.Client) { + disconnected := c.DisconnectNotify() + select { + case <-disconnected: + clearConnection(c) + } +} + +func (ovs OvsdbClient) Disconnect() { + ovs.rpcClient.Close() + clearConnection(ovs.rpcClient) +} From 0ce5e7f1e61ddd44abc194ea03ac993850db6eb4 Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Mon, 10 Nov 2014 22:03:50 -0800 Subject: [PATCH 2/2] Test cases for Connect/Timeout and Echo notification Signed-off-by: Madhu Venugopal --- ovs_integration_test.go | 86 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/ovs_integration_test.go b/ovs_integration_test.go index 38f75aa0..00266896 100644 --- a/ovs_integration_test.go +++ b/ovs_integration_test.go @@ -6,8 +6,49 @@ import ( "log" "os" "testing" + "time" ) +func TestConnect(t *testing.T) { + if testing.Short() { + t.Skip() + } + + timeoutChan := make(chan bool) + connected := make(chan bool) + go func() { + time.Sleep(10 * time.Second) + timeoutChan <- true + }() + + go func() { + // Use Convenience params. Ignore failure even if any + _, err := Connect("", 0) + if err != nil { + log.Println("Couldnt establish OVSDB connection with Defult params. No big deal") + } + }() + + go func() { + ovs, err := Connect(os.Getenv("DOCKER_IP"), int(6640)) + if err != nil { + connected <- false + } else { + connected <- true + ovs.Disconnect() + } + }() + + select { + case <-timeoutChan: + t.Error("Connection Timed Out") + case b := <-connected: + if !b { + t.Error("Couldnt connect to OVSDB Server") + } + } +} + func TestListDbs(t *testing.T) { if testing.Short() { t.Skip() @@ -209,6 +250,51 @@ func TestMonitor(t *testing.T) { ovs.Disconnect() } +func TestNotify(t *testing.T) { + if testing.Short() { + t.Skip() + } + + ovs, err := Connect(os.Getenv("DOCKER_IP"), int(6640)) + if err != nil { + log.Fatal("Failed to Connect. error:", err) + panic(err) + } + + notifyEchoChan := make(chan bool) + + notifier := Notifier{notifyEchoChan} + ovs.Register(notifier) + + timeoutChan := make(chan bool) + go func() { + time.Sleep(10 * time.Second) + timeoutChan <- true + }() + + select { + case <-timeoutChan: + t.Error("No Echo message notify in 10 seconds") + case <-notifyEchoChan: + break + } + ovs.Disconnect() +} + +type Notifier struct { + echoChan chan bool +} + +func (n Notifier) Update(context interface{}, tableUpdates TableUpdates) { +} +func (n Notifier) Locked([]interface{}) { +} +func (n Notifier) Stolen([]interface{}) { +} +func (n Notifier) Echo([]interface{}) { + n.echoChan <- true +} + func TestDBSchemaValidation(t *testing.T) { if testing.Short() {