Skip to content

Commit

Permalink
Merge pull request #1973 from /issues/678-persist-owned2
Browse files Browse the repository at this point in the history
Persist 'owned' addresses in IPAM
  • Loading branch information
bboreham committed Mar 3, 2016
2 parents bc61838 + 95cbc89 commit 4c159a9
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 6 deletions.
12 changes: 12 additions & 0 deletions common/docker/client.go
Expand Up @@ -94,6 +94,18 @@ func (c *Client) AddObserver(ob ContainerObserver) error {
return nil
}

func (c *Client) AllContainerIDs() ([]string, error) {
all, err := c.ListContainers(docker.ListContainersOptions{All: true})
if err != nil {
return nil, err
}
var ids []string
for _, c := range all {
ids = append(ids, c.ID)
}
return ids, nil
}

// IsContainerNotRunning returns true if we have checked with Docker that the ID is not running
func (c *Client) IsContainerNotRunning(idStr string) bool {
container, err := c.InspectContainer(idStr)
Expand Down
58 changes: 52 additions & 6 deletions ipam/allocator.go
Expand Up @@ -83,7 +83,7 @@ func NewAllocator(ourName mesh.PeerName, ourUID mesh.PeerUID, ourNickname string

// Start runs the allocator goroutine
func (alloc *Allocator) Start() {
alloc.loadPersistedRing()
alloc.loadPersistedData()
actionChan := make(chan func(), mesh.ChannelSize)
alloc.actionChan = actionChan
alloc.ticker = time.NewTicker(tickInterval)
Expand Down Expand Up @@ -267,6 +267,16 @@ func (alloc *Allocator) ContainerStarted(ident string) {
}
}

func (alloc *Allocator) AllContainerIDs(ids []string) {
alloc.actionChan <- func() {
idmap := make(map[string]struct{}, len(ids))
for _, id := range ids {
idmap[id] = struct{}{}
}
alloc.syncOwned(idmap)
}
}

// Delete (Sync) - release all IP addresses for container with given name
func (alloc *Allocator) Delete(ident string) error {
errChan := make(chan error)
Expand Down Expand Up @@ -759,10 +769,11 @@ func (alloc *Allocator) reportFreeSpace() {
alloc.ring.ReportFree(freespace)
}

// Persisting the Ring
// Persistent data
const (
ringIdent = "ring"
nameIdent = "peername"
ringIdent = "ring"
nameIdent = "peername"
ownedIdent = "ownedAddresses"
)

func (alloc *Allocator) persistRing() {
Expand All @@ -776,15 +787,16 @@ func (alloc *Allocator) persistRing() {
}
}

func (alloc *Allocator) loadPersistedRing() {
var checkPeerName mesh.PeerName
func (alloc *Allocator) loadPersistedData() {
checkPeerName := alloc.ourName
if err := alloc.db.Load(nameIdent, &checkPeerName); err != nil {
alloc.fatalf("Error loading persisted peer name: %s", err)
return
}
if checkPeerName != alloc.ourName {
alloc.infof("Deleting persisted data for peername %s", checkPeerName)
alloc.persistRing()
alloc.persistOwned()
return
}
if err := alloc.db.Load(ringIdent, &alloc.ring); err != nil {
Expand All @@ -793,6 +805,20 @@ func (alloc *Allocator) loadPersistedRing() {
if alloc.ring != nil {
alloc.space.UpdateRanges(alloc.ring.OwnedRanges())
}
if err := alloc.db.Load(ownedIdent, &alloc.owned); err != nil {
alloc.fatalf("Error loading persisted address data: %s", err)
}
for _, addrs := range alloc.owned {
for _, addr := range addrs {
alloc.space.Claim(addr)
}
}
}

func (alloc *Allocator) persistOwned() {
if err := alloc.db.Save(ownedIdent, alloc.owned); err != nil {
alloc.fatalf("Error persisting address data: %s", err)
}
}

// Owned addresses
Expand All @@ -809,11 +835,13 @@ func (alloc *Allocator) hasOwned(ident string) bool {
// NB: addr must not be owned by ident already
func (alloc *Allocator) addOwned(ident string, addr address.Address) {
alloc.owned[ident] = append(alloc.owned[ident], addr)
alloc.persistOwned()
}

func (alloc *Allocator) removeAllOwned(ident string) []address.Address {
a := alloc.owned[ident]
delete(alloc.owned, ident)
alloc.persistOwned()
return a
}

Expand All @@ -826,6 +854,7 @@ func (alloc *Allocator) removeOwned(ident string, addrToFree address.Address) bo
} else {
alloc.owned[ident] = append(addrs[:i], addrs[i+1:]...)
}
alloc.persistOwned()
return true
}
}
Expand All @@ -852,6 +881,23 @@ func (alloc *Allocator) findOwner(addr address.Address) string {
return ""
}

// For each ID in the 'owned' map, remove the entry if it isn't in the map
func (alloc *Allocator) syncOwned(ids map[string]struct{}) {
changed := false
for ident, addrs := range alloc.owned {
if _, found := ids[ident]; !found {
for _, addr := range addrs {
alloc.space.Free(addr)
}
delete(alloc.owned, ident)
changed = true
}
}
if changed {
alloc.persistOwned()
}
}

// Logging

func (alloc *Allocator) fatalf(fmt string, args ...interface{}) {
Expand Down
3 changes: 3 additions & 0 deletions prog/weaver/main.go
Expand Up @@ -228,6 +228,9 @@ func main() {
if iprangeCIDR != "" {
allocator, defaultSubnet = createAllocator(router.Router, iprangeCIDR, ipsubnetCIDR, determineQuorum(peerCount, peers), db, isKnownPeer)
observeContainers(allocator)
ids, err := dockerCli.AllContainerIDs()
checkFatal(err)
allocator.AllContainerIDs(ids)
} else if peerCount > 0 {
Log.Fatal("--init-peer-count flag specified without --ipalloc-range")
}
Expand Down
49 changes: 49 additions & 0 deletions test/163_docker_restart_test.sh
@@ -0,0 +1,49 @@
#! /bin/bash

. ./config.sh

check_attached() {
for c in $@; do
assert_raises "exec_on $HOST1 $c $CHECK_ETHWE_UP"
done
}

wait_for_proxy() {
for i in $(seq 1 120); do
echo "Waiting for proxy to start"
if proxy docker_on $1 info > /dev/null 2>&1 ; then
return
fi
sleep 1
done
echo "Timed out waiting for proxy to start" >&2
exit 1
}

start_suite "Containers get same IP address on restart"

# Remove any persisted data from previous runs
run_on $HOST1 "sudo rm -f /tmp/test163-*"

WEAVE_DOCKER_ARGS="-v /tmp:/db --restart=always" weave_on $HOST1 launch-router --db-prefix=/db/test163-
WEAVEPROXY_DOCKER_ARGS=--restart=always weave_on $HOST1 launch-proxy

# Use up first address with throwaway container
start_container $HOST1 --name=c1
# Use sigproxy+sleep to create a container that will die when Docker asks it to.
proxy docker_on $HOST1 run -di --name=c2 --restart=always -dt --entrypoint="/home/weave/sigproxy" weaveworks/weaveexec sleep 600
C2=$(container_ip $HOST1 c2)
assert_raises "[ -n $C2 ]"
check_attached c2

docker_on $HOST1 rm -f c1

# Restart docker daemon, using different commands for systemd- and upstart-managed.
run_on $HOST1 sh -c "command -v systemctl >/dev/null && sudo systemctl restart docker || sudo service docker restart"
wait_for_proxy $HOST1
sleep 3 # allow for re-tries of attach
check_attached c2
# Check same IP address was retained
assert "container_ip $HOST1 c2" "$C2"

end_suite

0 comments on commit 4c159a9

Please sign in to comment.