Skip to content

Commit

Permalink
[proxy] Remove unneeded channel from attach, and parallelize
Browse files Browse the repository at this point in the history
Trying a slightly simpler way to schedule retrying attaches.
  • Loading branch information
paulbellamy committed Dec 8, 2015
1 parent 2012cef commit 4f8c38c
Showing 1 changed file with 43 additions and 68 deletions.
111 changes: 43 additions & 68 deletions proxy/proxy.go
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -83,45 +82,55 @@ type Proxy struct {
weaveWaitNoopVolume string
normalisedAddrs []string
waiters map[*http.Request]*wait
attachContainer chan attachJob
attachJobs map[string]*attachJob
quit chan struct{}
wg sync.WaitGroup
}

type attachJob struct {
id string
tryAfter time.Time // next time to try this address
tryInterval time.Duration // retry delay on next failure
timer *time.Timer
}

func attachWithRetry(id string) attachJob {
return attachJob{id: id, tryAfter: time.Now(), tryInterval: initialInterval}
}
func (proxy *Proxy) attachWithRetry(id string) {
proxy.Lock()
defer proxy.Unlock()
if j, ok := proxy.attachJobs[id]; ok {
j.timer.Reset(time.Duration(0))
return
}

func attachAndKillOnFailure(id string) attachJob {
return attachJob{id: id}
j := &attachJob{id: id, tryInterval: initialInterval}
j.timer = time.AfterFunc(time.Duration(0), func() {
if err := proxy.attach(id, false); err != nil {
// The delay at the nth retry is a random value in the range
// [i-i/2,i+i/2], where i = initialInterval * 1.5^(n-1).
j.timer.Reset(j.tryInterval)
j.tryInterval = j.tryInterval * 3 / 2
if j.tryInterval > maxInterval {
j.tryInterval = maxInterval
}
return
}
proxy.notifyWaiters(id, nil)
})
proxy.attachJobs[id] = j
}

func (j attachJob) killOnFailure() bool {
return j.tryAfter.IsZero()
func (proxy *Proxy) attachAndKillOnFailure(id string) {
proxy.notifyWaiters(id, proxy.attach(id, true))
}

// The delay at the nth retry is a random value in the range
// [i-i/2,i+i/2], where i = initialInterval * 1.5^(n-1).
func (j *attachJob) nextTryLater() {
j.tryAfter = time.Now().Add(j.tryInterval/2 + time.Duration(rand.Int63n(int64(j.tryInterval))))
j.tryInterval = j.tryInterval * 3 / 2
if j.tryInterval > maxInterval {
j.tryInterval = maxInterval
}
func (j attachJob) Stop() {
j.timer.Stop()
}

func NewProxy(c Config) (*Proxy, error) {
p := &Proxy{
Config: c,
waiters: make(map[*http.Request]*wait),
attachContainer: make(chan attachJob),
quit: make(chan struct{}),
Config: c,
waiters: make(map[*http.Request]*wait),
attachJobs: make(map[string]*attachJob),
quit: make(chan struct{}),
}

if err := p.TLSConfig.LoadCerts(); err != nil {
Expand Down Expand Up @@ -159,16 +168,13 @@ func NewProxy(c Config) (*Proxy, error) {

client.AddObserver(p)

p.wg.Add(1)
go p.containerAttachLoop()

return p, nil
}

func (proxy *Proxy) AttachExistingContainers() {
containers, _ := proxy.client.ListContainers(docker.ListContainersOptions{})
for _, c := range containers {
proxy.attachContainer <- attachWithRetry(c.ID)
proxy.attachWithRetry(c.ID)
}
}

Expand Down Expand Up @@ -364,7 +370,7 @@ func (proxy *Proxy) listen(protoAndAddr string) (net.Listener, string, error) {

// weavedocker.ContainerObserver interface
func (proxy *Proxy) ContainerStarted(ident string) {
proxy.attachContainer <- attachAndKillOnFailure(ident)
proxy.attachAndKillOnFailure(ident)
}

func containerShouldAttach(container *docker.Container) bool {
Expand All @@ -390,6 +396,10 @@ func (proxy *Proxy) removeWait(r *http.Request) {

func (proxy *Proxy) notifyWaiters(ident string, err error) {
proxy.Lock()
if j, ok := proxy.attachJobs[ident]; ok {
j.Stop()
delete(proxy.attachJobs, ident)
}
for _, wait := range proxy.waiters {
if ident == wait.ident && !wait.done {
wait.ch <- err
Expand Down Expand Up @@ -657,46 +667,11 @@ func (proxy *Proxy) symlink(unixAddrs []string) (err error) {
return
}

// containerAttachLoop manages container attachment retry timers, to ensure
// that failed containers are retried. new containers should be sent to
// proxy.attachContainer
func (proxy *Proxy) containerAttachLoop() {
defer proxy.wg.Done()
var (
a attachJob
nextRetry <-chan time.Time
pendingRetries []attachJob
)
now := time.Now()
for {
if len(pendingRetries) > 0 && nextRetry == nil {
nextRetry = time.After(pendingRetries[0].tryAfter.Sub(now))
}
select {
case <-nextRetry:
nextRetry = nil
a = pendingRetries[0]
pendingRetries = pendingRetries[1:]
case a = <-proxy.attachContainer:
for i, j := range pendingRetries { // remove any existing entry for same id
if j.id == a.id {
pendingRetries = append(pendingRetries[:i], pendingRetries[i+1:]...)
break
}
}
case <-proxy.quit:
return
}
if err := proxy.attach(a.id, a.killOnFailure()); err != nil && !a.killOnFailure() {
a.nextTryLater()
pendingRetries = append(pendingRetries, a)
} else {
proxy.notifyWaiters(a.id, err)
}
}
}

func (proxy *Proxy) Stop() {
close(proxy.quit)
proxy.wg.Wait()
proxy.Lock()
defer proxy.Unlock()
for _, j := range proxy.attachJobs {
j.Stop()
}
}

0 comments on commit 4f8c38c

Please sign in to comment.