Skip to content
31 changes: 31 additions & 0 deletions address/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ import (
"gopkg.in/m-lab/pipe.v3"
)

// Manager manages access to a device by IP and port.
type Manager interface {
Start(port, device string) error
Grant(ip net.IP) error
Revoke(ip net.IP) error
Stop() ([]byte, error)
}

// IPManager supports granting IP subnet access using iptables or ip6tables.
type IPManager struct {
*semaphore.Weighted
Expand Down Expand Up @@ -75,3 +83,26 @@ func cmdForIP(ip net.IP) (string, string) {
}
return iptables, "/64"
}

// NullManager implements the address.Manager interface while doing nothing.
type NullManager struct{}

// Grant does nothing with the given ip.
func (r *NullManager) Grant(ip net.IP) error {
return nil
}

// Revoke does nothing with the given ip.
func (r *NullManager) Revoke(ip net.IP) error {
return nil
}

// Start does nothing to the given port or device.
func (r *NullManager) Start(port, device string) error {
return nil
}

// Stop does nothing.
func (r *NullManager) Stop() ([]byte, error) {
return nil, nil
}
19 changes: 19 additions & 0 deletions address/granter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,22 @@ func TestIPManager(t *testing.T) {
}
wg.Wait()
}

// TestNullManager verifies that the NullManager does nothing.
func TestNullManager(t *testing.T) {
t.Run("null-manager", func(t *testing.T) {
r := &NullManager{}
if err := r.Grant(net.ParseIP("127.0.0.1")); err != nil {
t.Errorf("NullManager.Grant() error = %v, want nil", err)
}
if err := r.Revoke(net.ParseIP("127.0.0.1")); err != nil {
t.Errorf("NullManager.Revoke() error = %v, want nil", err)
}
if err := r.Start("1234", "eth0"); err != nil {
t.Errorf("NullManager.Start() error = %v, want nil", err)
}
if _, err := r.Stop(); err != nil {
t.Errorf("NullManager.Stop() error = %v, want nil", err)
}
})
}
16 changes: 16 additions & 0 deletions chanio/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package chanio

import "io"

// ReadOnce reads from the given reader once and closes the returned channel. All data is discarded.
func ReadOnce(r io.Reader) <-chan struct{} {
c := make(chan struct{})
go func() {
b := make([]byte, 1)
// Block on read. Will return on EOF or when client sends data (which is discarded).
r.Read(b)
// Close channel to send reader a signal.
close(c)
}()
return c
}
25 changes: 25 additions & 0 deletions chanio/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package chanio

import (
"bytes"
"context"
"testing"
"time"
)

func TestReadOnce(t *testing.T) {
t.Run("okay", func(t *testing.T) {
b := bytes.NewBufferString("message")
got := ReadOnce(b)
// Absolute timeout. Should never be reached.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

select {
case <-got:
// success
case <-ctx.Done():
t.Errorf("ReadOnce() = context should never timeout")
}
})
}
23 changes: 23 additions & 0 deletions cmd/envelope/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,26 @@ eyJhdWQiOlsibWxhYjEubGdhMDMiXSwiZXhwIjoxNTg0NTAyMjEyLCJpc3MiOiJsb2NhdGUubWVhc3Vy
nRsYWIubmV0Iiwic3ViIjoiMTI3LjAuMC4yIn0.FZSjjDjWJVGSKzJKJP5Cbaacp8PNqGX5_zETe3SQsXvhlo
hGlAlKLdhDkjBDIKttXkO3BL5xyQ09cVGfmbelDA
```

### Local development without access tokens

Start the access envelope server, without requiring access tokens (and
without iptables management; by default these are both required).

```sh
~/bin/envelope -envelope.token-required=false
```

Connect to the local access envelope using `curl`. When tokens are not
required, the default timeout is 60s. After this timeout, the server will
hangup automatically.

```sh
curl --no-buffer \
--header "Connection: Upgrade" \
--header "Upgrade: websocket" \
--header "Sec-WebSocket-Protocol: net.measurementlab.envelope" \
--header "Sec-WebSocket-Version: 13" \
--header "Sec-WebSocket-Key: aGVsbG8K" \
http://localhost:8880/v0/envelope/access
```
143 changes: 108 additions & 35 deletions cmd/envelope/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/websocket"
"github.com/justinas/alice"
"gopkg.in/square/go-jose.v2/jwt"

"github.com/m-lab/access/address"
"github.com/m-lab/access/chanio"
"github.com/m-lab/access/controller"
"github.com/m-lab/access/token"
"github.com/m-lab/go/flagx"
Expand Down Expand Up @@ -82,44 +85,32 @@ func customFormat(w io.Writer, p handlers.LogFormatterParams) {
}

func (env *envelopeHandler) AllowRequest(rw http.ResponseWriter, req *http.Request) {
// AllowRequest is a state-changing POST method.
if req.Method != http.MethodPost {
// Websocket requests must be GET. Also note that AllowRequest is a
// state-changing operation.
if req.Method != http.MethodGet {
rw.WriteHeader(http.StatusMethodNotAllowed)
return
}

cl := controller.GetClaim(req.Context())
if cl == nil {
// This could happen if the TokenController is disabled.
logx.Debug.Println("missing claim")
rw.WriteHeader(http.StatusInternalServerError)
return
}

if cl.Subject != env.subject {
logx.Debug.Println("wrong subject claim")
rw.WriteHeader(http.StatusBadRequest)
return
}

// Tests may run (possibly repeatedly) until the claim expires.
deadline := cl.Expiry.Time()
if deadline.Before(time.Now()) {
logx.Debug.Println("already past expiration")
// Use client remote address as the basis of granting temporary subnet access.
host, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
logx.Debug.Println("failed to split remote addr:", err)
rw.WriteHeader(http.StatusBadRequest)
return
}

// Use client remote address as the basis of granting temporary subnet access.
host, _, err := net.SplitHostPort(req.RemoteAddr)
// Get deadline based on token claim.
cl := controller.GetClaim(req.Context())
deadline, err := env.getDeadline(cl)
if err != nil {
logx.Debug.Println("failed to split remote addr")
logx.Debug.Println("failed to get deadline:", err)
rw.WriteHeader(http.StatusBadRequest)
return
}

allow := net.ParseIP(host)
err = env.Grant(allow)
remote := net.ParseIP(host)
err = env.Grant(remote)
switch {
case err == address.ErrMaxConcurrent:
logx.Debug.Println("grant limit reached")
Expand All @@ -131,19 +122,90 @@ func (env *envelopeHandler) AllowRequest(rw http.ResponseWriter, req *http.Reque
return
}

ctx, cancel := context.WithDeadline(req.Context(), deadline)
defer cancel()
// Keep the lease until:
// * client disconnects.
// * timeout expires.
// * parent context is cancelled.
<-ctx.Done()
conn := setupConn(rw, req)
if conn == nil {
logx.Debug.Println("setup websocket conn failed")
rw.WriteHeader(http.StatusInternalServerError)
// TODO: handle panic.
rtx.PanicOnError(env.Revoke(remote), "Failed to remove rule for "+remote.String())
return
}

// At this point, we want to wait for either the deadline (when the envelope
// service closes the connection) or the client to close the websocket conn
// (to signal completion).
env.wait(req.Context(), conn, deadline)

// TODO: handle panic.
rtx.PanicOnError(env.Revoke(allow), "Failed to remove rule for "+allow.String())
rtx.PanicOnError(env.Revoke(remote), "Failed to remove rule for "+remote.String())
}

func (env *envelopeHandler) getDeadline(cl *jwt.Claims) (time.Time, error) {
if cl == nil && requireTokens {
logx.Debug.Println("missing claim")
return time.Time{}, fmt.Errorf("missing claim when tokens required")
}

if cl == nil {
// This could happen if tokens are not required.
return time.Now().Add(time.Minute), nil
}

if cl.Subject != env.subject {
logx.Debug.Println("wrong subject claim")
return time.Time{}, fmt.Errorf("wrong claim subject")
}

// Tests may run (possibly repeatedly) until the claim expires.
deadline := cl.Expiry.Time()
if deadline.Before(time.Now()) {
logx.Debug.Println("already past expiration")
return time.Time{}, fmt.Errorf("already past claim expiration")
}
return deadline, nil
}

func setupConn(writer http.ResponseWriter, request *http.Request) *websocket.Conn {
headers := http.Header{}
headers.Add("Sec-WebSocket-Protocol", "net.measurementlab.envelope")
upgrader := websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// Allow cross origin resource sharing
return true
},
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
conn, err := upgrader.Upgrade(writer, request, headers)
if err != nil {
logx.Debug.Println("failed to upgrade", err)
return nil
}
return conn
}

func (env *envelopeHandler) wait(ctx context.Context, c *websocket.Conn, dl time.Time) {
// NOTE: we are explicitly ignoring the error value from SetDeadline.
// Any error there will show up on read below.
c.SetReadDeadline(dl)
c.SetWriteDeadline(dl)
ctxdl, cancel := context.WithDeadline(ctx, dl)
defer cancel()
// Clean up client connection upon return.
defer c.Close()

// Keep the client connection open and the IP grant enabled until:
// * parent context expires.
// * context deadline expires.
// * client disconnects (or writes data that we don't expect).
select {
case <-ctxdl.Done():
case <-chanio.ReadOnce(c.UnderlyingConn()):
}
}

var mainCtx, mainCancel = context.WithCancel(context.Background())
var getEnvelopeHandler = func(subject string, mgr *address.IPManager) envelopeHandler {
var getEnvelopeHandler = func(subject string, mgr address.Manager) envelopeHandler {
return envelopeHandler{
manager: mgr,
subject: subject,
Expand All @@ -161,7 +223,12 @@ func main() {
verify, err := token.NewVerifier(verifyKeys.Get()...)
rtx.Must(err, "Failed to create token verifier")

mgr := address.NewIPManager(maxIPs)
var mgr address.Manager
if requireTokens {
mgr = address.NewIPManager(maxIPs)
} else {
mgr = &address.NullManager{}
}
env := getEnvelopeHandler(subject, mgr)
ctl, _ := controller.Setup(mainCtx, verify, requireTokens, machine)
// Handle all requests using the alice http handler chaining library.
Expand All @@ -172,6 +239,12 @@ func main() {
srv := &http.Server{
Addr: listenAddr,
Handler: ac.Then(mux),

// NOTE: prevent connections from staying open indefinitely.
// And, these timeouts are reset for individual clients that
// negotiate the websocket connection.
ReadTimeout: time.Minute,
WriteTimeout: time.Minute,
}
_, port, err := net.SplitHostPort(listenAddr)
rtx.Must(err, "failed to split listen address: %q", listenAddr)
Expand Down
Loading