forked from juju/juju
/
manifold.go
122 lines (108 loc) · 2.95 KB
/
manifold.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// Copyright 2015 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package gate
import (
"sync"
"github.com/juju/errors"
"gopkg.in/tomb.v1"
"github.com/juju/juju/worker"
"github.com/juju/juju/worker/dependency"
)
// Manifold returns a dependency.Manifold that wraps a single channel, shared
// across all workers returned by the start func; it can be used to synchronize
// operations across manifolds that lack direct dependency relationships.
//
// The output func accepts an out pointer to either an Unlocker or a Waiter.
func Manifold() dependency.Manifold {
return ManifoldEx(NewLock())
}
// ManifoldEx does the same thing as Manifold but takes the
// Lock which used to wait on or unlock the gate. This
// allows code running outside of a dependency engine managed worker
// to monitor or unlock the gate.
//
// TODO(mjs) - this can likely go away once all machine agent workers
// are running inside the dependency engine.
func ManifoldEx(lock Lock) dependency.Manifold {
return dependency.Manifold{
Start: func(_ dependency.Context) (worker.Worker, error) {
w := &gate{lock: lock}
go func() {
defer w.tomb.Done()
<-w.tomb.Dying()
}()
return w, nil
},
Output: func(in worker.Worker, out interface{}) error {
inWorker, _ := in.(*gate)
if inWorker == nil {
return errors.Errorf("in should be a *gate; is %#v", in)
}
switch outPointer := out.(type) {
case *Unlocker:
*outPointer = inWorker.lock
case *Waiter:
*outPointer = inWorker.lock
case *Lock:
*outPointer = inWorker.lock
default:
return errors.Errorf("out should be a *Unlocker, *Waiter, *Lock; is %#v", out)
}
return nil
},
}
}
// NewLock returns a new Lock for the gate manifold, suitable for
// passing to ManifoldEx. It can be safely unlocked and monitored by
// code running inside or outside of the dependency engine.
func NewLock() Lock {
return &lock{
// mu and ch are shared across all workers started by the returned manifold.
// In normal operation, there will only be one such worker at a time; but if
// multiple workers somehow run in parallel, mu should prevent panic and/or
// confusion.
mu: new(sync.Mutex),
ch: make(chan struct{}),
}
}
// Lock implements of Unlocker and Waiter
type lock struct {
mu *sync.Mutex
ch chan struct{}
}
// Unlock implements Unlocker.
func (l *lock) Unlock() {
l.mu.Lock()
defer l.mu.Unlock()
select {
case <-l.ch:
default:
close(l.ch)
}
}
// Unlocked implements Waiter.
func (l *lock) Unlocked() <-chan struct{} {
return l.ch
}
// IsUnlocked implements Waiter.
func (l *lock) IsUnlocked() bool {
select {
case <-l.ch:
return true
default:
return false
}
}
// gate implements a degenerate worker that holds a Lock.
type gate struct {
tomb tomb.Tomb
lock Lock
}
// Kill is part of the worker.Worker interface.
func (w *gate) Kill() {
w.tomb.Kill(nil)
}
// Wait is part of the worker.Worker interface.
func (w *gate) Wait() error {
return w.tomb.Wait()
}