Skip to content

Commit

Permalink
Merge pull request #37 from safchain/fix_race
Browse files Browse the repository at this point in the history
Add mutex to protect connections fixing a race condition
  • Loading branch information
dave-tucker committed Mar 18, 2016
2 parents 58cf012 + c39b981 commit d5bc040
Showing 1 changed file with 10 additions and 0 deletions.
10 changes: 10 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"log"
"net"
"sync"

"github.com/socketplane/libovsdb/Godeps/_workspace/src/github.com/cenkalti/rpc2"
"github.com/socketplane/libovsdb/Godeps/_workspace/src/github.com/cenkalti/rpc2/jsonrpc"
Expand All @@ -19,6 +20,8 @@ type OvsdbClient struct {

func newOvsdbClient(c *rpc2.Client) *OvsdbClient {
ovs := &OvsdbClient{rpcClient: c, Schema: make(map[string]DatabaseSchema)}
connectionsMutex.Lock()
defer connectionsMutex.Unlock()
if connections == nil {
connections = make(map[*rpc2.Client]*OvsdbClient)
}
Expand All @@ -29,6 +32,7 @@ func newOvsdbClient(c *rpc2.Client) *OvsdbClient {
// Would rather replace this connection map with an OvsdbClient Receiver scoped method
// Unfortunately rpc2 package acts wierd with a receiver scoped method and needs some investigation.
var connections map[*rpc2.Client]*OvsdbClient
var connectionsMutex = &sync.RWMutex{}

const DEFAULT_ADDR = "127.0.0.1"
const DEFAULT_PORT = 6640
Expand Down Expand Up @@ -95,6 +99,8 @@ type NotificationHandler interface {
// RFC 7047 : Section 4.1.6 : Echo
func echo(client *rpc2.Client, args []interface{}, reply *[]interface{}) error {
*reply = args
connectionsMutex.RLock()
defer connectionsMutex.RUnlock()
if _, ok := connections[client]; ok {
for _, handler := range connections[client].handlers {
handler.Echo(nil)
Expand Down Expand Up @@ -128,6 +134,8 @@ func update(client *rpc2.Client, params []interface{}, reply *interface{}) error

// Update the local DB cache with the tableUpdates
tableUpdates := getTableUpdatesFromRawUnmarshal(rowUpdates)
connectionsMutex.RLock()
defer connectionsMutex.RUnlock()
if _, ok := connections[client]; ok {
for _, handler := range connections[client].handlers {
handler.Update(params, tableUpdates)
Expand Down Expand Up @@ -233,6 +241,8 @@ func getTableUpdatesFromRawUnmarshal(raw map[string]map[string]RowUpdate) TableU
}

func clearConnection(c *rpc2.Client) {
connectionsMutex.Lock()
defer connectionsMutex.Unlock()
if _, ok := connections[c]; ok {
for _, handler := range connections[c].handlers {
if handler != nil {
Expand Down

0 comments on commit d5bc040

Please sign in to comment.