Skip to content

Commit

Permalink
Added application notification infra & fixed a few other minor issues
Browse files Browse the repository at this point in the history
Signed-off-by: Madhu Venugopal <madhu@socketplane.io>
  • Loading branch information
mavenugo committed Nov 11, 2014
1 parent cb2fd65 commit eefd279
Showing 1 changed file with 77 additions and 10 deletions.
87 changes: 77 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,90 @@ import (
type OvsdbClient struct {
rpcClient *rpc2.Client
Schema map[string]DatabaseSchema
handlers []NotificationHandler
}

func Connect(ipAddr string, port int) (OvsdbClient, error) {
func newOvsdbClient(c *rpc2.Client) *OvsdbClient {
ovs := &OvsdbClient{rpcClient: c, Schema: make(map[string]DatabaseSchema)}
if connections == nil {
connections = make(map[*rpc2.Client]*OvsdbClient)
}
connections[c] = ovs
return ovs
}

// Would rather replace this connection map with an OvsdbClient Receiver scoped method
// Unfortunately rpc2 package acts wierd with a receiver scoped method and needs some investigation.
var connections map[*rpc2.Client]*OvsdbClient

const DEFAULT_ADDR = "127.0.0.1"
const DEFAULT_PORT = 6640

func Connect(ipAddr string, port int) (*OvsdbClient, error) {
if ipAddr == "" {
ipAddr = DEFAULT_ADDR
}

if port <= 0 {
port = DEFAULT_PORT
}

target := fmt.Sprintf("%s:%d", ipAddr, port)
conn, err := net.Dial("tcp", target)

if err != nil {
panic(err)
return nil, err
}

c := rpc2.NewClientWithCodec(jsonrpc.NewJSONCodec(conn))
// Process Async Notifications
c.Handle("echo", echo)
c.Handle("update", update)

go c.Run()
ovs := OvsdbClient{c, make(map[string]DatabaseSchema)}
go handleDisconnectNotification(c)

ovs := newOvsdbClient(c)

// Process Async Notifications
dbs, err := ovs.ListDbs()
if err == nil {
for _, db := range dbs {
schema, err := ovs.GetSchema(db)
if err == nil {
ovs.Schema[db] = *schema
} else {
return nil, err
}
}
}
return ovs, err
return ovs, nil
}

func (ovs OvsdbClient) Disconnect() {
ovs.rpcClient.Close()
func (ovs *OvsdbClient) Register(handler NotificationHandler) {
ovs.handlers = append(ovs.handlers, handler)
}

type NotificationHandler interface {
// RFC 7047 section 4.1.6 Update Notification
Update(context interface{}, tableUpdates TableUpdates)

// RFC 7047 section 4.1.9 Locked Notification
Locked([]interface{})

// RFC 7047 section 4.1.10 Stolen Notification
Stolen([]interface{})

// RFC 7047 section 4.1.11 Echo Notification
Echo([]interface{})
}

// RFC 7047 : Section 4.1.6 : Echo
func echo(client *rpc2.Client, args string, reply *interface{}) error {
*reply = args
if _, ok := connections[client]; ok {
for _, handler := range connections[client].handlers {
handler.Echo(nil)
}
}
return nil
}

Expand Down Expand Up @@ -78,8 +126,10 @@ func update(client *rpc2.Client, params []interface{}, reply *interface{}) error

// Update the local DB cache with the tableUpdates
tableUpdates := getTableUpdatesFromRawUnmarshal(rowUpdates)
if len(tableUpdates.Updates) > 0 {
return nil
if _, ok := connections[client]; ok {
for _, handler := range connections[client].handlers {
handler.Update(params, tableUpdates)
}
}

return nil
Expand Down Expand Up @@ -179,3 +229,20 @@ func getTableUpdatesFromRawUnmarshal(raw map[string]map[string]RowUpdate) TableU
}
return tableUpdates
}

func clearConnection(c *rpc2.Client) {
connections[c] = nil
}

func handleDisconnectNotification(c *rpc2.Client) {
disconnected := c.DisconnectNotify()
select {
case <-disconnected:
clearConnection(c)
}
}

func (ovs OvsdbClient) Disconnect() {
ovs.rpcClient.Close()
clearConnection(ovs.rpcClient)
}

0 comments on commit eefd279

Please sign in to comment.