-
Notifications
You must be signed in to change notification settings - Fork 32
/
proxy.go
257 lines (219 loc) · 6.31 KB
/
proxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
package socket
import (
"context"
"crypto/tls"
"encoding/json"
"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"path/filepath"
"strings"
"time"
"github.com/facebookgo/httpdown"
"github.com/go-zoo/bone"
"github.com/satori/go.uuid"
"github.com/manifoldco/torus-cli/apitypes"
"github.com/manifoldco/torus-cli/config"
"github.com/manifoldco/torus-cli/daemon/db"
"github.com/manifoldco/torus-cli/daemon/logic"
"github.com/manifoldco/torus-cli/daemon/observer"
"github.com/manifoldco/torus-cli/daemon/registry"
"github.com/manifoldco/torus-cli/daemon/routes"
"github.com/manifoldco/torus-cli/daemon/session"
)
// AuthProxy exposes an HTTP interface over a domain socket.
// It handles adding auth headers to requests on the `/proxy` endpoint to
// directly proxy requests from the cli to the registry, and exposes an
// interface over `/v1` for secure and composite operations.
type AuthProxy struct {
u *url.URL
l net.Listener
s httpdown.Server
c *config.Config
db *db.DB
sess session.Session
o *observer.Observer
t *http.Transport
client *registry.Client
logic *logic.Engine
}
// NewAuthProxy returns a new AuthProxy. It will return an error if creation
// of the domain socket fails, or the upstream registry URL is misconfigured.
//
// If groupShared is true, the domain socket will be readable and writable by
// both the user and the user's group (so daemon can be accessed by multiple
// users). If false, the socket will only be readable and writable by the user
// running the daemon.
func NewAuthProxy(c *config.Config, sess session.Session, db *db.DB, t *http.Transport,
client *registry.Client, logic *logic.Engine, groupShared bool) (*AuthProxy, error) {
l, err := makeSocket(c.SocketPath, groupShared)
if err != nil {
return nil, err
}
return &AuthProxy{
u: c.RegistryURI,
l: l,
c: c,
db: db,
sess: sess,
o: observer.New(),
t: t,
client: client,
logic: logic,
}, nil
}
// CreateHTTPTransport creates and configures the
func CreateHTTPTransport(cfg *config.Config) *http.Transport {
return &http.Transport{TLSClientConfig: &tls.Config{
ServerName: strings.Split(cfg.RegistryURI.Host, ":")[0],
RootCAs: cfg.CABundle,
}}
}
// Listen starts the main loop of the AuthProxy. It returns on error, or when
// the AuthProxy is closed.
func (p *AuthProxy) Listen() error {
mux := bone.New()
proxy := &httputil.ReverseProxy{
Transport: p.t,
Director: func(r *http.Request) {
r.URL.Scheme = p.u.Scheme
r.URL.Host = p.u.Host
r.Host = p.u.Host
r.URL.Path = r.URL.Path[6:]
tok := p.sess.Token()
if tok != "" {
r.Header["Authorization"] = []string{"Bearer " + tok}
}
r.Header["User-Agent"] = []string{"Torus-Daemon/" + p.c.Version}
r.Header["X-Registry-Version"] = []string{p.c.APIVersion}
},
}
go p.o.Start()
mux.HandleFunc("/proxy/", proxyCanceler(proxy))
mux.SubRoute("/v1", routes.NewRouteMux(p.c, p.sess, p.db, p.t, p.o, p.client, p.logic))
h := httpdown.HTTP{}
p.s = h.Serve(&http.Server{Handler: requestIDHandler(loggingHandler(mux))}, p.l)
return p.s.Wait()
}
// Close gracefully closes the socket, ensuring all requests are finished
// within the timeout.
func (p *AuthProxy) Close() error {
p.o.Stop()
return p.s.Stop()
}
// Addr returns the domain socket this proxy is listening on.
func (p *AuthProxy) Addr() string {
return p.l.Addr().String()
}
func loggingHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := r.URL.Path
next.ServeHTTP(w, r)
log.Printf("%s %s", r.Method, p)
})
}
func requestIDHandler(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
id := r.Header.Get("X-Request-Id")
if id == "" {
id = uuid.NewV4().String()
}
ctx := context.WithValue(r.Context(), observer.CtxRequestID, id)
r = r.WithContext(ctx)
next.ServeHTTP(w, r)
})
}
func makeSocket(socketPath string, groupShared bool) (net.Listener, error) {
absPath, err := filepath.Abs(socketPath)
if err != nil {
return nil, err
}
// Attempt to remove an existing socket at this path if it exists.
// Guarding against a server already running is outside the scope of this
// module.
err = os.Remove(absPath)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
l, err := net.Listen("unix", absPath)
if err != nil {
return nil, err
}
mode := os.FileMode(0700)
if groupShared {
mode = 0760
}
// Does not guarantee security; BSD ignores file permissions for sockets
// see https://github.com/manifoldco/torus-cli/issues/76 for details
if err = os.Chmod(socketPath, mode); err != nil {
return nil, err
}
return l, nil
}
// proxyCanceler supports canceling proxied requests via a timeout, and
// returning a custom error response.
func proxyCanceler(proxy http.Handler) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
defer cancelFunc()
cw := cancelingProxyResponseWriter{
redirect: false,
written: false,
rw: w,
}
r = r.WithContext(ctx)
done := make(chan bool)
go func() {
proxy.ServeHTTP(&cw, r)
close(done)
}()
select {
case <-done:
return
case <-ctx.Done():
if ctx.Err() != context.DeadlineExceeded {
return
}
cw.redirect = true
if !cw.written {
w.WriteHeader(http.StatusRequestTimeout)
enc := json.NewEncoder(w)
err := enc.Encode(&apitypes.Error{
Type: "request_timeout",
Err: []string{"Request timed out"},
})
if err != nil {
log.Printf("Error writing response timeout: %s", err)
}
}
}
}
}
// cancelingProxyResponseWriter Wraps a regular ResponseWriter to allow it to
// be canceled, discarding anything written to it, providing it has not yet
// been written to.
type cancelingProxyResponseWriter struct {
redirect bool
written bool
rw http.ResponseWriter
}
func (c *cancelingProxyResponseWriter) Header() http.Header {
return c.rw.Header()
}
func (c *cancelingProxyResponseWriter) Write(b []byte) (int, error) {
if c.redirect && !c.written {
return len(b), nil
}
c.written = true
return c.rw.Write(b)
}
func (c *cancelingProxyResponseWriter) WriteHeader(s int) {
if c.redirect && !c.written {
return
}
c.written = true
c.rw.WriteHeader(s)
}