From 8dcf9960aa81767376e580f20d9ecea3e68f62ad Mon Sep 17 00:00:00 2001 From: Alessandro Boch Date: Mon, 21 Nov 2016 23:38:03 -0800 Subject: [PATCH] Add missing locks in agent and service code Signed-off-by: Alessandro Boch --- agent.go | 137 +++++++++++++++++++++++++++---------------- controller.go | 11 +++- network.go | 13 ++-- networkdb/cluster.go | 6 ++ service_common.go | 5 +- service_linux.go | 2 +- 6 files changed, 110 insertions(+), 64 deletions(-) diff --git a/agent.go b/agent.go index c4f18ba9a8..540aa79cf8 100644 --- a/agent.go +++ b/agent.go @@ -7,6 +7,7 @@ import ( "net" "os" "sort" + "sync" "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/stringid" @@ -39,6 +40,7 @@ type agent struct { advertiseAddr string epTblCancel func() driverCancelFuncs map[string][]func() + sync.Mutex } func getBindAddr(ifaceName string) (string, error) { @@ -86,9 +88,16 @@ func resolveAddr(addrOrInterface string) (string, error) { func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { drvEnc := discoverapi.DriverEncryptionUpdate{} - a := c.agent + a := c.getAgent() + if a == nil { + logrus.Debug("Skipping key change as agent is nil") + return nil + } + // Find the deleted key. If the deleted key was the primary key, // a new primary key should be set before removing if from keyring. + c.Lock() + added := []byte{} deleted := []byte{} j := len(c.keys) for i := 0; i < j; { @@ -127,7 +136,7 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { if !same { c.keys = append(c.keys, key) if key.Subsystem == subsysGossip { - a.networkDB.SetKey(key.Key) + added = key.Key } if key.Subsystem == subsysIPSec { @@ -136,6 +145,11 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { } } } + c.Unlock() + + if len(added) > 0 { + a.networkDB.SetKey(added) + } key, tag, err := c.getPrimaryKeyTag(subsysGossip) if err != nil { @@ -166,8 +180,10 @@ func (c *controller) handleKeyChange(keys []*types.EncryptionKey) error { } func (c *controller) agentSetup() error { + c.Lock() clusterProvider := c.cfg.Daemon.ClusterProvider - + agent := c.agent + c.Unlock() bindAddr := clusterProvider.GetLocalAddress() advAddr := clusterProvider.GetAdvertiseAddress() remote := clusterProvider.GetRemoteAddress() @@ -176,7 +192,7 @@ func (c *controller) agentSetup() error { listenAddr, _, _ := net.SplitHostPort(listen) logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Remote-addr =%s", listenAddr, bindAddr, advAddr, remoteAddr) - if advAddr != "" && c.agent == nil { + if advAddr != "" && agent == nil { if err := c.agentInit(listenAddr, bindAddr, advAddr); err != nil { logrus.Errorf("Error in agentInit : %v", err) } else { @@ -208,6 +224,9 @@ func (c *controller) agentSetup() error { // For a given subsystem getKeys sorts the keys by lamport time and returns // slice of keys and lamport time which can used as a unique tag for the keys func (c *controller) getKeys(subsys string) ([][]byte, []uint64) { + c.Lock() + defer c.Unlock() + sort.Sort(ByTime(c.keys)) keys := [][]byte{} @@ -227,6 +246,8 @@ func (c *controller) getKeys(subsys string) ([][]byte, []uint64) { // getPrimaryKeyTag returns the primary key for a given subsystem from the // list of sorted key and the associated tag func (c *controller) getPrimaryKeyTag(subsys string) ([]byte, uint64, error) { + c.Lock() + defer c.Unlock() sort.Sort(ByTime(c.keys)) keys := []*types.EncryptionKey{} for _, key := range c.keys { @@ -265,6 +286,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st ch, cancel := nDB.Watch("endpoint_table", "", "") + c.Lock() c.agent = &agent{ networkDB: nDB, bindAddr: bindAddr, @@ -272,6 +294,7 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st epTblCancel: cancel, driverCancelFuncs: make(map[string][]func()), } + c.Unlock() go c.handleTableEvents(ch, c.handleEpTableEvent) @@ -294,21 +317,22 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st } func (c *controller) agentJoin(remote string) error { - if c.agent == nil { + agent := c.getAgent() + if agent == nil { return nil } - - return c.agent.networkDB.Join([]string{remote}) + return agent.networkDB.Join([]string{remote}) } func (c *controller) agentDriverNotify(d driverapi.Driver) { - if c.agent == nil { + agent := c.getAgent() + if agent == nil { return } d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{ - Address: c.agent.advertiseAddr, - BindAddress: c.agent.bindAddr, + Address: agent.advertiseAddr, + BindAddress: agent.bindAddr, Self: true, }) @@ -339,11 +363,19 @@ func (c *controller) agentClose() { return } + var cancelList []func() + + agent.Lock() for _, cancelFuncs := range agent.driverCancelFuncs { for _, cancel := range cancelFuncs { - cancel() + cancelList = append(cancelList, cancel) } } + agent.Unlock() + + for _, cancel := range cancelList { + cancel() + } agent.epTblCancel() @@ -354,13 +386,7 @@ func (n *network) isClusterEligible() bool { if n.driverScope() != datastore.GlobalScope { return false } - - c := n.getController() - if c.agent == nil { - return false - } - - return true + return n.getController().getAgent() != nil } func (n *network) joinCluster() error { @@ -368,8 +394,12 @@ func (n *network) joinCluster() error { return nil } - c := n.getController() - return c.agent.networkDB.JoinNetwork(n.ID()) + agent := n.getController().getAgent() + if agent == nil { + return nil + } + + return agent.networkDB.JoinNetwork(n.ID()) } func (n *network) leaveCluster() error { @@ -377,8 +407,12 @@ func (n *network) leaveCluster() error { return nil } - c := n.getController() - return c.agent.networkDB.LeaveNetwork(n.ID()) + agent := n.getController().getAgent() + if agent == nil { + return nil + } + + return agent.networkDB.LeaveNetwork(n.ID()) } func (ep *endpoint) addDriverInfoToCluster() error { @@ -390,10 +424,7 @@ func (ep *endpoint) addDriverInfoToCluster() error { return nil } - ctrlr := n.ctrlr - ctrlr.Lock() - agent := ctrlr.agent - ctrlr.Unlock() + agent := n.getController().getAgent() if agent == nil { return nil } @@ -415,10 +446,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error { return nil } - ctrlr := n.ctrlr - ctrlr.Lock() - agent := ctrlr.agent - ctrlr.Unlock() + agent := n.getController().getAgent() if agent == nil { return nil } @@ -438,6 +466,7 @@ func (ep *endpoint) addServiceInfoToCluster() error { } c := n.getController() + agent := c.getAgent() if !ep.isAnonymous() && ep.Iface().Address() != nil { var ingressPorts []*PortConfig if ep.svcID != "" { @@ -466,8 +495,10 @@ func (ep *endpoint) addServiceInfoToCluster() error { return err } - if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil { - return err + if agent != nil { + if err := agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil { + return err + } } } @@ -481,6 +512,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error { } c := n.getController() + agent := c.getAgent() + if !ep.isAnonymous() { if ep.svcID != "" && ep.Iface().Address() != nil { var ingressPorts []*PortConfig @@ -492,9 +525,10 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error { return err } } - - if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil { - return err + if agent != nil { + if err := agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil { + return err + } } } return nil @@ -506,16 +540,15 @@ func (n *network) addDriverWatches() { } c := n.getController() + agent := c.getAgent() + if agent == nil { + return + } for _, tableName := range n.driverTables { - c.Lock() - if c.agent == nil { - c.Unlock() - return - } - ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "") - c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel) - c.Unlock() - + ch, cancel := agent.networkDB.Watch(tableName, n.ID(), "") + agent.Lock() + agent.driverCancelFuncs[n.ID()] = append(agent.driverCancelFuncs[n.ID()], cancel) + agent.Unlock() go c.handleTableEvents(ch, n.handleDriverTableEvent) d, err := n.driver(false) if err != nil { @@ -523,7 +556,7 @@ func (n *network) addDriverWatches() { return } - c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool { + agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool { if nid == n.ID() { d.EventNotify(driverapi.Create, nid, tableName, key, value) } @@ -538,11 +571,15 @@ func (n *network) cancelDriverWatches() { return } - c := n.getController() - c.Lock() - cancelFuncs := c.agent.driverCancelFuncs[n.ID()] - delete(c.agent.driverCancelFuncs, n.ID()) - c.Unlock() + agent := n.getController().getAgent() + if agent == nil { + return + } + + agent.Lock() + cancelFuncs := agent.driverCancelFuncs[n.ID()] + delete(agent.driverCancelFuncs, n.ID()) + agent.Unlock() for _, cancel := range cancelFuncs { cancel() diff --git a/controller.go b/controller.go index 6eaa1c6638..68bccf074b 100644 --- a/controller.go +++ b/controller.go @@ -237,12 +237,13 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { func (c *controller) SetClusterProvider(provider cluster.Provider) { c.Lock() - defer c.Unlock() c.cfg.Daemon.ClusterProvider = provider + disableProviderCh := c.cfg.Daemon.DisableProvider + c.Unlock() if provider != nil { go c.clusterAgentInit() } else { - c.cfg.Daemon.DisableProvider <- struct{}{} + disableProviderCh <- struct{}{} } } @@ -295,6 +296,12 @@ func (c *controller) SetKeys(keys []*types.EncryptionKey) error { return c.handleKeyChange(keys) } +func (c *controller) getAgent() *agent { + c.Lock() + defer c.Unlock() + return c.agent +} + func (c *controller) clusterAgentInit() { clusterProvider := c.cfg.Daemon.ClusterProvider for { diff --git a/network.go b/network.go index ef40ea2bcc..79e7e49a95 100644 --- a/network.go +++ b/network.go @@ -1485,17 +1485,12 @@ func (n *network) Peers() []networkdb.PeerInfo { return []networkdb.PeerInfo{} } - var nDB *networkdb.NetworkDB - n.ctrlr.Lock() - if n.ctrlr.agentInitDone == nil && n.ctrlr.agent != nil { - nDB = n.ctrlr.agent.networkDB + agent := n.getController().getAgent() + if agent == nil { + return []networkdb.PeerInfo{} } - n.ctrlr.Unlock() - if nDB != nil { - return n.ctrlr.agent.networkDB.Peers(n.id) - } - return []networkdb.PeerInfo{} + return agent.networkDB.Peers(n.ID()) } func (n *network) DriverOptions() map[string]string { diff --git a/networkdb/cluster.go b/networkdb/cluster.go index c871d92f44..2c1a438c54 100644 --- a/networkdb/cluster.go +++ b/networkdb/cluster.go @@ -45,6 +45,8 @@ func (l *logWriter) Write(p []byte) (int, error) { // SetKey adds a new key to the key ring func (nDB *NetworkDB) SetKey(key []byte) { logrus.Debugf("Adding key %s", hex.EncodeToString(key)[0:5]) + nDB.Lock() + defer nDB.Unlock() for _, dbKey := range nDB.config.Keys { if bytes.Equal(key, dbKey) { return @@ -60,6 +62,8 @@ func (nDB *NetworkDB) SetKey(key []byte) { // been added apriori through SetKey func (nDB *NetworkDB) SetPrimaryKey(key []byte) { logrus.Debugf("Primary Key %s", hex.EncodeToString(key)[0:5]) + nDB.RLock() + defer nDB.RUnlock() for _, dbKey := range nDB.config.Keys { if bytes.Equal(key, dbKey) { if nDB.keyring != nil { @@ -74,6 +78,8 @@ func (nDB *NetworkDB) SetPrimaryKey(key []byte) { // can't be the primary key func (nDB *NetworkDB) RemoveKey(key []byte) { logrus.Debugf("Remove Key %s", hex.EncodeToString(key)[0:5]) + nDB.Lock() + defer nDB.Unlock() for i, dbKey := range nDB.config.Keys { if bytes.Equal(key, dbKey) { nDB.config.Keys = append(nDB.config.Keys[:i], nDB.config.Keys[i+1:]...) diff --git a/service_common.go b/service_common.go index a0172f5944..b43c6403f9 100644 --- a/service_common.go +++ b/service_common.go @@ -156,11 +156,10 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in c.Lock() s, ok := c.serviceBindings[skey] + c.Unlock() if !ok { - c.Unlock() return nil } - c.Unlock() s.Lock() lb, ok := s.loadBalancers[nid] @@ -188,7 +187,9 @@ func (c *controller) rmServiceBinding(name, sid, nid, eid string, vip net.IP, in if len(s.loadBalancers) == 0 { // All loadbalancers for the service removed. Time to // remove the service itself. + c.Lock() delete(c.serviceBindings, skey) + c.Unlock() } // Remove loadbalancer service(if needed) and backend in all diff --git a/service_linux.go b/service_linux.go index be8dc84d3e..fbcc89d22e 100644 --- a/service_linux.go +++ b/service_linux.go @@ -34,8 +34,8 @@ func init() { func (n *network) connectedLoadbalancers() []*loadBalancer { c := n.getController() - serviceBindings := make([]*service, 0, len(c.serviceBindings)) c.Lock() + serviceBindings := make([]*service, 0, len(c.serviceBindings)) for _, s := range c.serviceBindings { serviceBindings = append(serviceBindings, s) }