Skip to content

Commit

Permalink
NetworkDB backport from master
Browse files Browse the repository at this point in the history
Aligned networkDB folder to master code (commit:fbba5555)

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Sep 22, 2017
1 parent fb9c091 commit 377eda0
Show file tree
Hide file tree
Showing 10 changed files with 849 additions and 442 deletions.
34 changes: 14 additions & 20 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,12 +274,12 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr st
nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID())
logrus.Info("Gossip cluster hostname ", nodeName)

nDB, err := networkdb.New(&networkdb.Config{
BindAddr: listenAddr,
AdvertiseAddr: advertiseAddr,
NodeName: nodeName,
Keys: keys,
})
netDBConf := networkdb.DefaultConfig()
netDBConf.NodeName = nodeName
netDBConf.BindAddr = listenAddr
netDBConf.AdvertiseAddr = advertiseAddr
netDBConf.Keys = keys
nDB, err := networkdb.New(netDBConf)

if err != nil {
return err
Expand Down Expand Up @@ -373,15 +373,11 @@ func (c *controller) agentClose() {

agent.Lock()
for _, cancelFuncs := range agent.driverCancelFuncs {
for _, cancel := range cancelFuncs {
cancelList = append(cancelList, cancel)
}
cancelList = append(cancelList, cancelFuncs...)
}

// Add also the cancel functions for the network db
for _, cancel := range agent.coreCancelFuncs {
cancelList = append(cancelList, cancel)
}
cancelList = append(cancelList, agent.coreCancelFuncs...)
agent.Unlock()

for _, cancel := range cancelList {
Expand Down Expand Up @@ -514,7 +510,6 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
return err
}
}

buf, err := proto.Marshal(&EndpointRecord{
Name: ep.Name(),
ServiceName: ep.svcName,
Expand All @@ -528,7 +523,6 @@ func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
if err != nil {
return err
}

if agent != nil {
if err := agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), buf); err != nil {
logrus.Warnf("addServiceInfoToCluster NetworkDB CreateEntry failed for %s %s err:%s", ep.id, n.id, err)
Expand Down Expand Up @@ -569,12 +563,12 @@ func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) err
if n.ingress {
ingressPorts = ep.ingressPorts
}
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Name(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.name(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster", true); err != nil {
return err
}
} else {
// This is a container simply attached to an attachable network
if err := c.delContainerNameResolution(n.ID(), ep.ID(), ep.Name(), ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
if err := c.delContainerNameResolution(n.ID(), ep.ID(), ep.name(), ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
}
Expand Down Expand Up @@ -758,27 +752,27 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
if svcID != "" {
// This is a remote task part of a service
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
logrus.Errorf("failed adding service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.addContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
logrus.Errorf("failed adding container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
} else {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", eid, epRec)
if svcID != "" {
// This is a remote task part of a service
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent", true); err != nil {
logrus.Errorf("failed removing service binding for %s epRec:%v err:%s", eid, epRec, err)
logrus.Errorf("failed removing service binding for %s epRec:%v err:%v", eid, epRec, err)
return
}
} else {
// This is a remote container simply attached to an attachable network
if err := c.delContainerNameResolution(nid, eid, containerName, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for %s epRec:%v err:%s", eid, epRec, err)
logrus.Errorf("failed removing container name resolution for %s epRec:%v err:%v", eid, epRec, err)
}
}
}
Expand Down
17 changes: 14 additions & 3 deletions networkdb/broadcast.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package networkdb

import (
"fmt"
"errors"
"time"

"github.com/hashicorp/memberlist"
Expand Down Expand Up @@ -86,11 +86,20 @@ func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
notify: notifyCh,
})

nDB.RLock()
noPeers := len(nDB.nodes) <= 1
nDB.RUnlock()

// Message enqueued, do not wait for a send if no peer is present
if noPeers {
return nil
}

// Wait for the broadcast
select {
case <-notifyCh:
case <-time.After(broadcastTimeout):
return fmt.Errorf("timed out broadcasting node event")
return errors.New("timed out broadcasting node event")
}

return nil
Expand All @@ -106,7 +115,7 @@ type tableEventMessage struct {

func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
otherm := other.(*tableEventMessage)
return m.id == otherm.id && m.tname == otherm.tname && m.key == otherm.key
return m.tname == otherm.tname && m.id == otherm.id && m.key == otherm.key
}

func (m *tableEventMessage) Message() []byte {
Expand All @@ -125,6 +134,8 @@ func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname st
TableName: tname,
Key: key,
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}

raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
Expand Down
Loading

0 comments on commit 377eda0

Please sign in to comment.