-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
cookie.go
160 lines (146 loc) · 4.9 KB
/
cookie.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package filewatcher
import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/vercel/turbo/cli/internal/fs"
"github.com/vercel/turbo/cli/internal/turbopath"
)
// CookieWaiter is the interface used by clients that need to wait
// for a roundtrip through the filewatching API.
type CookieWaiter interface {
WaitForCookie() error
}
var (
// ErrCookieTimeout is returned when we did not see our cookie file within the given time constraints
ErrCookieTimeout = errors.New("timed out waiting for cookie")
// ErrCookieWatchingClosed is returned when the underlying filewatching has been closed.
ErrCookieWatchingClosed = errors.New("filewatching has closed, cannot watch cookies")
)
// CookieJar is used for tracking roundtrips through the filesystem watching API
type CookieJar struct {
timeout time.Duration
dir turbopath.AbsoluteSystemPath
serial uint64
mu sync.Mutex
cookies map[turbopath.AbsoluteSystemPath]chan error
closed bool
}
// NewCookieJar returns a new instance of a CookieJar. There should only ever be a single
// instance live per cookieDir, since they expect to have full control over that directory.
func NewCookieJar(cookieDir turbopath.AbsoluteSystemPath, timeout time.Duration) (*CookieJar, error) {
if err := cookieDir.RemoveAll(); err != nil {
return nil, err
}
if err := cookieDir.MkdirAll(0775); err != nil {
return nil, err
}
return &CookieJar{
timeout: timeout,
dir: cookieDir,
cookies: make(map[turbopath.AbsoluteSystemPath]chan error),
}, nil
}
// removeAllCookiesWithError sends the error to every channel, closes every channel,
// and attempts to remove every cookie file. Must be called while the cj.mu is held.
// If the cookie jar is going to be reused afterwards, the cookies map must be reinitialized.
func (cj *CookieJar) removeAllCookiesWithError(err error) {
for p, ch := range cj.cookies {
_ = p.Remove()
ch <- err
close(ch)
}
// Drop all of the references so they can be cleaned up
cj.cookies = nil
}
// OnFileWatchClosed handles the case where filewatching had to close for some reason
// We send an error to all of our cookies and stop accepting new ones.
func (cj *CookieJar) OnFileWatchClosed() {
cj.mu.Lock()
defer cj.mu.Unlock()
cj.closed = true
cj.removeAllCookiesWithError(ErrCookieWatchingClosed)
}
// OnFileWatchError handles when filewatching has encountered an error.
// In the error case, we remove all cookies and send them errors. We remain
// available for later cookies.
func (cj *CookieJar) OnFileWatchError(err error) {
// We are now in an inconsistent state. Drop all of our cookies,
// but we still allow new ones to be created
cj.mu.Lock()
defer cj.mu.Unlock()
cj.removeAllCookiesWithError(err)
cj.cookies = make(map[turbopath.AbsoluteSystemPath]chan error)
}
// OnFileWatchEvent determines if the specified event is relevant
// for cookie watching and notifies the appropriate cookie if so.
func (cj *CookieJar) OnFileWatchEvent(ev Event) {
if ev.EventType == FileAdded {
isCookie, err := fs.DirContainsPath(cj.dir.ToStringDuringMigration(), ev.Path.ToStringDuringMigration())
if err != nil {
cj.OnFileWatchError(errors.Wrapf(err, "failed to determine if path is a cookie: %v", ev.Path))
} else if isCookie {
cj.notifyCookie(ev.Path, nil)
}
}
}
// WaitForCookie touches a unique file, then waits for it to show up in filesystem notifications.
// This provides a theoretical bound on filesystem operations, although it's possible
// that underlying filewatch mechanisms don't respect this ordering.
func (cj *CookieJar) WaitForCookie() error {
// we're only ever going to send a single error on the channel, add a buffer so that we never
// block sending it.
ch := make(chan error, 1)
serial := atomic.AddUint64(&cj.serial, 1)
cookiePath := cj.dir.UntypedJoin(fmt.Sprintf("%v.cookie", serial))
cj.mu.Lock()
if cj.closed {
cj.mu.Unlock()
return ErrCookieWatchingClosed
}
cj.cookies[cookiePath] = ch
cj.mu.Unlock()
if err := touchCookieFile(cookiePath); err != nil {
cj.notifyCookie(cookiePath, err)
return err
}
select {
case <-time.After(cj.timeout):
return ErrCookieTimeout
case err, ok := <-ch:
if !ok {
// the channel closed without an error, we're all set
return nil
}
// the channel didn't close, meaning we got some error.
// We don't need to wait on channel close, it's going to be closed
// immediately by whoever sent the error. Return the error directly
return err
}
}
func (cj *CookieJar) notifyCookie(cookie turbopath.AbsoluteSystemPath, err error) {
cj.mu.Lock()
ch, ok := cj.cookies[cookie]
// delete is a no-op if the key doesn't exist
delete(cj.cookies, cookie)
cj.mu.Unlock()
if ok {
if err != nil {
ch <- err
}
close(ch)
}
}
func touchCookieFile(cookie turbopath.AbsoluteSystemPath) error {
f, err := cookie.OpenFile(os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0700)
if err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return nil
}